linera_views/views/
key_value_store_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! We implement two types:
5//! 1) The first type `KeyValueStoreView` implements View and the function of `KeyValueStore`.
6//!
7//! 2) The second type `ViewContainer` encapsulates `KeyValueStoreView` and provides the following functionalities:
8//!    * The `Clone` trait
9//!    * a `write_batch` that takes a `&self` instead of a `&mut self`
10//!    * a `write_batch` that writes in the context instead of writing of the view.
11//!
12//! Currently, that second type is only used for tests.
13//!
14//! Key tags to create the sub-keys of a `KeyValueStoreView` on top of the base key.
15
16use std::{collections::BTreeMap, fmt::Debug, mem, ops::Bound::Included, sync::Mutex};
17
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{data_types::ArithmeticError, ensure};
21use serde::{Deserialize, Serialize};
22
23use crate::{
24    batch::{Batch, WriteOperation},
25    common::{
26        from_bytes_option, from_bytes_option_or_default, get_interval, get_upper_bound,
27        DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
28    },
29    context::Context,
30    map_view::ByteMapView,
31    store::ReadableKeyValueStore,
32    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
33};
34
35#[cfg(with_metrics)]
36mod metrics {
37    use std::sync::LazyLock;
38
39    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
40    use prometheus::HistogramVec;
41
42    /// The latency of hash computation
43    pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
44        register_histogram_vec(
45            "key_value_store_view_hash_latency",
46            "KeyValueStoreView hash latency",
47            &[],
48            exponential_bucket_latencies(5.0),
49        )
50    });
51
52    /// The latency of get operation
53    pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54        register_histogram_vec(
55            "key_value_store_view_get_latency",
56            "KeyValueStoreView get latency",
57            &[],
58            exponential_bucket_latencies(5.0),
59        )
60    });
61
62    /// The latency of multi get
63    pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
64        LazyLock::new(|| {
65            register_histogram_vec(
66                "key_value_store_view_multi_get_latency",
67                "KeyValueStoreView multi get latency",
68                &[],
69                exponential_bucket_latencies(5.0),
70            )
71        });
72
73    /// The latency of contains key
74    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
75        LazyLock::new(|| {
76            register_histogram_vec(
77                "key_value_store_view_contains_key_latency",
78                "KeyValueStoreView contains key latency",
79                &[],
80                exponential_bucket_latencies(5.0),
81            )
82        });
83
84    /// The latency of contains keys
85    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
86        LazyLock::new(|| {
87            register_histogram_vec(
88                "key_value_store_view_contains_keys_latency",
89                "KeyValueStoreView contains keys latency",
90                &[],
91                exponential_bucket_latencies(5.0),
92            )
93        });
94
95    /// The latency of find keys by prefix operation
96    pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
97        LazyLock::new(|| {
98            register_histogram_vec(
99                "key_value_store_view_find_keys_by_prefix_latency",
100                "KeyValueStoreView find keys by prefix latency",
101                &[],
102                exponential_bucket_latencies(5.0),
103            )
104        });
105
106    /// The latency of find key values by prefix operation
107    pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
108        LazyLock::new(|| {
109            register_histogram_vec(
110                "key_value_store_view_find_key_values_by_prefix_latency",
111                "KeyValueStoreView find key values by prefix latency",
112                &[],
113                exponential_bucket_latencies(5.0),
114            )
115        });
116
117    /// The latency of write batch operation
118    pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
119        LazyLock::new(|| {
120            register_histogram_vec(
121                "key_value_store_view_write_batch_latency",
122                "KeyValueStoreView write batch latency",
123                &[],
124                exponential_bucket_latencies(5.0),
125            )
126        });
127}
128
129#[cfg(with_testing)]
130use {
131    crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
132    async_lock::RwLock,
133    std::sync::Arc,
134    thiserror::Error,
135};
136
137#[repr(u8)]
138enum KeyTag {
139    /// Prefix for the indices of the view.
140    Index = MIN_VIEW_TAG,
141    /// The total stored size
142    TotalSize,
143    /// The prefix where the sizes are being stored
144    Sizes,
145    /// Prefix for the hash.
146    Hash,
147}
148
149/// A pair containing the key and value size.
150#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub struct SizeData {
152    /// The size of the key
153    pub key: u32,
154    /// The size of the value
155    pub value: u32,
156}
157
158impl SizeData {
159    /// Sums both terms
160    pub fn sum(&mut self) -> u32 {
161        self.key + self.value
162    }
163
164    /// Adds a size to `self`
165    pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
166        self.key = self
167            .key
168            .checked_add(size.key)
169            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
170        self.value = self
171            .value
172            .checked_add(size.value)
173            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
174        Ok(())
175    }
176
177    /// Subtracts a size from `self`
178    pub fn sub_assign(&mut self, size: SizeData) {
179        self.key -= size.key;
180        self.value -= size.value;
181    }
182}
183
184/// A view that represents the functions of `KeyValueStore`.
185///
186/// Comment on the data set:
187/// In order to work, the view needs to store the updates and deleted prefixes.
188/// The updates and deleted prefixes have to be coherent. This means:
189/// * If an index is deleted by one in deleted prefixes then it should not be present
190///   in the updates at all.
191/// * [`DeletePrefix::key_prefix`][entry1] should not dominate anyone. That is if we have `[0,2]`
192///   then we should not have `[0,2,3]` since it would be dominated by the preceding.
193///
194/// With that we have:
195/// * in order to test if an `index` is deleted by a prefix we compute the highest deleted prefix `dp`
196///   such that `dp <= index`.
197///   If `dp` is indeed a prefix then we conclude that `index` is deleted, otherwise not.
198///   The no domination is essential here.
199///
200/// [entry1]: crate::batch::WriteOperation::DeletePrefix
201#[derive(Debug)]
202pub struct KeyValueStoreView<C> {
203    context: C,
204    deletion_set: DeletionSet,
205    updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
206    stored_total_size: SizeData,
207    total_size: SizeData,
208    sizes: ByteMapView<C, u32>,
209    stored_hash: Option<HasherOutput>,
210    hash: Mutex<Option<HasherOutput>>,
211}
212
213impl<C: Context> View for KeyValueStoreView<C> {
214    const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
215
216    type Context = C;
217
218    fn context(&self) -> &C {
219        &self.context
220    }
221
222    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
223        let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
224        let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
225        let mut v = vec![key_hash, key_total_size];
226        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
227        let context_sizes = context.clone_with_base_key(base_key);
228        v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
229        Ok(v)
230    }
231
232    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
233        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
234        let total_size =
235            from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
236        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
237        let context_sizes = context.clone_with_base_key(base_key);
238        let sizes = ByteMapView::post_load(
239            context_sizes,
240            values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
241        )?;
242        Ok(Self {
243            context,
244            deletion_set: DeletionSet::new(),
245            updates: BTreeMap::new(),
246            stored_total_size: total_size,
247            total_size,
248            sizes,
249            stored_hash: hash,
250            hash: Mutex::new(hash),
251        })
252    }
253
254    async fn load(context: C) -> Result<Self, ViewError> {
255        let keys = Self::pre_load(&context)?;
256        let values = context.store().read_multi_values_bytes(keys).await?;
257        Self::post_load(context, &values)
258    }
259
260    fn rollback(&mut self) {
261        self.deletion_set.rollback();
262        self.updates.clear();
263        self.total_size = self.stored_total_size;
264        self.sizes.rollback();
265        *self.hash.get_mut().unwrap() = self.stored_hash;
266    }
267
268    async fn has_pending_changes(&self) -> bool {
269        if self.deletion_set.has_pending_changes() {
270            return true;
271        }
272        if !self.updates.is_empty() {
273            return true;
274        }
275        if self.stored_total_size != self.total_size {
276            return true;
277        }
278        if self.sizes.has_pending_changes().await {
279            return true;
280        }
281        let hash = self.hash.lock().unwrap();
282        self.stored_hash != *hash
283    }
284
285    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
286        let mut delete_view = false;
287        if self.deletion_set.delete_storage_first {
288            delete_view = true;
289            self.stored_total_size = SizeData::default();
290            batch.delete_key_prefix(self.context.base_key().bytes.clone());
291            for (index, update) in mem::take(&mut self.updates) {
292                if let Update::Set(value) = update {
293                    let key = self
294                        .context
295                        .base_key()
296                        .base_tag_index(KeyTag::Index as u8, &index);
297                    batch.put_key_value_bytes(key, value);
298                    delete_view = false;
299                }
300            }
301            self.stored_hash = None
302        } else {
303            for index in mem::take(&mut self.deletion_set.deleted_prefixes) {
304                let key = self
305                    .context
306                    .base_key()
307                    .base_tag_index(KeyTag::Index as u8, &index);
308                batch.delete_key_prefix(key);
309            }
310            for (index, update) in mem::take(&mut self.updates) {
311                let key = self
312                    .context
313                    .base_key()
314                    .base_tag_index(KeyTag::Index as u8, &index);
315                match update {
316                    Update::Removed => batch.delete_key(key),
317                    Update::Set(value) => batch.put_key_value_bytes(key, value),
318                }
319            }
320        }
321        self.sizes.flush(batch)?;
322        let hash = *self.hash.get_mut().unwrap();
323        if self.stored_hash != hash {
324            let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
325            match hash {
326                None => batch.delete_key(key),
327                Some(hash) => batch.put_key_value(key, &hash)?,
328            }
329            self.stored_hash = hash;
330        }
331        if self.stored_total_size != self.total_size {
332            let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
333            batch.put_key_value(key, &self.total_size)?;
334            self.stored_total_size = self.total_size;
335        }
336        self.deletion_set.delete_storage_first = false;
337        Ok(delete_view)
338    }
339
340    fn clear(&mut self) {
341        self.deletion_set.clear();
342        self.updates.clear();
343        self.total_size = SizeData::default();
344        self.sizes.clear();
345        *self.hash.get_mut().unwrap() = None;
346    }
347}
348
349impl<C: Context> ClonableView for KeyValueStoreView<C> {
350    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
351        Ok(KeyValueStoreView {
352            context: self.context.clone(),
353            deletion_set: self.deletion_set.clone(),
354            updates: self.updates.clone(),
355            stored_total_size: self.stored_total_size,
356            total_size: self.total_size,
357            sizes: self.sizes.clone_unchecked()?,
358            stored_hash: self.stored_hash,
359            hash: Mutex::new(*self.hash.get_mut().unwrap()),
360        })
361    }
362}
363
364impl<C: Context> KeyValueStoreView<C> {
365    fn max_key_size(&self) -> usize {
366        let prefix_len = self.context.base_key().bytes.len();
367        <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
368    }
369
370    /// Getting the total sizes that will be used for keys and values when stored
371    /// ```rust
372    /// # tokio_test::block_on(async {
373    /// # use linera_views::context::MemoryContext;
374    /// # use linera_views::key_value_store_view::{KeyValueStoreView, SizeData};
375    /// # use linera_views::views::View;
376    /// # let context = MemoryContext::new_for_testing(());
377    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
378    /// let total_size = view.total_size();
379    /// assert_eq!(total_size, SizeData::default());
380    /// # })
381    /// ```
382    pub fn total_size(&self) -> SizeData {
383        self.total_size
384    }
385
386    /// Applies the function f over all indices. If the function f returns
387    /// false, then the loop ends prematurely.
388    /// ```rust
389    /// # tokio_test::block_on(async {
390    /// # use linera_views::context::MemoryContext;
391    /// # use linera_views::key_value_store_view::KeyValueStoreView;
392    /// # use linera_views::views::View;
393    /// # let context = MemoryContext::new_for_testing(());
394    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
395    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
396    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
397    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
398    /// let mut count = 0;
399    /// view.for_each_index_while(|_key| {
400    ///     count += 1;
401    ///     Ok(count < 2)
402    /// })
403    /// .await
404    /// .unwrap();
405    /// assert_eq!(count, 2);
406    /// # })
407    /// ```
408    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
409    where
410        F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
411    {
412        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
413        let mut updates = self.updates.iter();
414        let mut update = updates.next();
415        if !self.deletion_set.delete_storage_first {
416            let mut suffix_closed_set =
417                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
418            for index in self
419                .context
420                .store()
421                .find_keys_by_prefix(&key_prefix)
422                .await?
423            {
424                loop {
425                    match update {
426                        Some((key, value)) if key <= &index => {
427                            if let Update::Set(_) = value {
428                                if !f(key)? {
429                                    return Ok(());
430                                }
431                            }
432                            update = updates.next();
433                            if key == &index {
434                                break;
435                            }
436                        }
437                        _ => {
438                            if !suffix_closed_set.find_key(&index) && !f(&index)? {
439                                return Ok(());
440                            }
441                            break;
442                        }
443                    }
444                }
445            }
446        }
447        while let Some((key, value)) = update {
448            if let Update::Set(_) = value {
449                if !f(key)? {
450                    return Ok(());
451                }
452            }
453            update = updates.next();
454        }
455        Ok(())
456    }
457
458    /// Applies the function f over all indices.
459    /// ```rust
460    /// # tokio_test::block_on(async {
461    /// # use linera_views::context::MemoryContext;
462    /// # use linera_views::key_value_store_view::KeyValueStoreView;
463    /// # use linera_views::views::View;
464    /// # let context = MemoryContext::new_for_testing(());
465    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
466    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
467    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
468    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
469    /// let mut count = 0;
470    /// view.for_each_index(|_key| {
471    ///     count += 1;
472    ///     Ok(())
473    /// })
474    /// .await
475    /// .unwrap();
476    /// assert_eq!(count, 3);
477    /// # })
478    /// ```
479    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
480    where
481        F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
482    {
483        self.for_each_index_while(|key| {
484            f(key)?;
485            Ok(true)
486        })
487        .await
488    }
489
490    /// Applies the function f over all index/value pairs.
491    /// If the function f returns false then the loop ends prematurely.
492    /// ```rust
493    /// # tokio_test::block_on(async {
494    /// # use linera_views::context::MemoryContext;
495    /// # use linera_views::key_value_store_view::KeyValueStoreView;
496    /// # use linera_views::views::View;
497    /// # let context = MemoryContext::new_for_testing(());
498    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
499    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
500    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
501    /// let mut values = Vec::new();
502    /// view.for_each_index_value_while(|_key, value| {
503    ///     values.push(value.to_vec());
504    ///     Ok(values.len() < 1)
505    /// })
506    /// .await
507    /// .unwrap();
508    /// assert_eq!(values, vec![vec![0]]);
509    /// # })
510    /// ```
511    pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
512    where
513        F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
514    {
515        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
516        let mut updates = self.updates.iter();
517        let mut update = updates.next();
518        if !self.deletion_set.delete_storage_first {
519            let mut suffix_closed_set =
520                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
521            for entry in self
522                .context
523                .store()
524                .find_key_values_by_prefix(&key_prefix)
525                .await?
526            {
527                let (index, index_val) = entry;
528                loop {
529                    match update {
530                        Some((key, value)) if key <= &index => {
531                            if let Update::Set(value) = value {
532                                if !f(key, value)? {
533                                    return Ok(());
534                                }
535                            }
536                            update = updates.next();
537                            if key == &index {
538                                break;
539                            }
540                        }
541                        _ => {
542                            if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
543                                return Ok(());
544                            }
545                            break;
546                        }
547                    }
548                }
549            }
550        }
551        while let Some((key, value)) = update {
552            if let Update::Set(value) = value {
553                if !f(key, value)? {
554                    return Ok(());
555                }
556            }
557            update = updates.next();
558        }
559        Ok(())
560    }
561
562    /// Applies the function f over all index/value pairs.
563    /// ```rust
564    /// # tokio_test::block_on(async {
565    /// # use linera_views::context::MemoryContext;
566    /// # use linera_views::key_value_store_view::KeyValueStoreView;
567    /// # use linera_views::views::View;
568    /// # let context = MemoryContext::new_for_testing(());
569    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
570    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
571    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
572    /// let mut part_keys = Vec::new();
573    /// view.for_each_index_while(|key| {
574    ///     part_keys.push(key.to_vec());
575    ///     Ok(part_keys.len() < 1)
576    /// })
577    /// .await
578    /// .unwrap();
579    /// assert_eq!(part_keys, vec![vec![0, 1]]);
580    /// # })
581    /// ```
582    pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
583    where
584        F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
585    {
586        self.for_each_index_value_while(|key, value| {
587            f(key, value)?;
588            Ok(true)
589        })
590        .await
591    }
592
593    /// Returns the list of indices in lexicographic order.
594    /// ```rust
595    /// # tokio_test::block_on(async {
596    /// # use linera_views::context::MemoryContext;
597    /// # use linera_views::key_value_store_view::KeyValueStoreView;
598    /// # use linera_views::views::View;
599    /// # let context = MemoryContext::new_for_testing(());
600    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
601    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
602    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
603    /// let indices = view.indices().await.unwrap();
604    /// assert_eq!(indices, vec![vec![0, 1], vec![0, 2]]);
605    /// # })
606    /// ```
607    pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
608        let mut indices = Vec::new();
609        self.for_each_index(|index| {
610            indices.push(index.to_vec());
611            Ok(())
612        })
613        .await?;
614        Ok(indices)
615    }
616
617    /// Returns the list of indices and values in lexicographic order.
618    /// ```rust
619    /// # tokio_test::block_on(async {
620    /// # use linera_views::context::MemoryContext;
621    /// # use linera_views::key_value_store_view::KeyValueStoreView;
622    /// # use linera_views::views::View;
623    /// # let context = MemoryContext::new_for_testing(());
624    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
625    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
626    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
627    /// let key_values = view.indices().await.unwrap();
628    /// assert_eq!(key_values, vec![vec![0, 1], vec![0, 2]]);
629    /// # })
630    /// ```
631    pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
632        let mut index_values = Vec::new();
633        self.for_each_index_value(|index, value| {
634            index_values.push((index.to_vec(), value.to_vec()));
635            Ok(())
636        })
637        .await?;
638        Ok(index_values)
639    }
640
641    /// Returns the number of entries.
642    /// ```rust
643    /// # tokio_test::block_on(async {
644    /// # use linera_views::context::MemoryContext;
645    /// # use linera_views::key_value_store_view::KeyValueStoreView;
646    /// # use linera_views::views::View;
647    /// # let context = MemoryContext::new_for_testing(());
648    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
649    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
650    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
651    /// let count = view.count().await.unwrap();
652    /// assert_eq!(count, 2);
653    /// # })
654    /// ```
655    pub async fn count(&self) -> Result<usize, ViewError> {
656        let mut count = 0;
657        self.for_each_index(|_index| {
658            count += 1;
659            Ok(())
660        })
661        .await?;
662        Ok(count)
663    }
664
665    /// Obtains the value at the given index, if any.
666    /// ```rust
667    /// # tokio_test::block_on(async {
668    /// # use linera_views::context::MemoryContext;
669    /// # use linera_views::key_value_store_view::KeyValueStoreView;
670    /// # use linera_views::views::View;
671    /// # let context = MemoryContext::new_for_testing(());
672    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
673    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
674    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![42]));
675    /// assert_eq!(view.get(&[0, 2]).await.unwrap(), None);
676    /// # })
677    /// ```
678    pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
679        #[cfg(with_metrics)]
680        let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
681        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
682        if let Some(update) = self.updates.get(index) {
683            let value = match update {
684                Update::Removed => None,
685                Update::Set(value) => Some(value.clone()),
686            };
687            return Ok(value);
688        }
689        if self.deletion_set.contains_prefix_of(index) {
690            return Ok(None);
691        }
692        let key = self
693            .context
694            .base_key()
695            .base_tag_index(KeyTag::Index as u8, index);
696        Ok(self.context.store().read_value_bytes(&key).await?)
697    }
698
699    /// Tests whether the store contains a specific index.
700    /// ```rust
701    /// # tokio_test::block_on(async {
702    /// # use linera_views::context::MemoryContext;
703    /// # use linera_views::key_value_store_view::KeyValueStoreView;
704    /// # use linera_views::views::View;
705    /// # let context = MemoryContext::new_for_testing(());
706    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
707    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
708    /// assert!(view.contains_key(&[0, 1]).await.unwrap());
709    /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
710    /// # })
711    /// ```
712    pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
713        #[cfg(with_metrics)]
714        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
715        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
716        if let Some(update) = self.updates.get(index) {
717            let test = match update {
718                Update::Removed => false,
719                Update::Set(_value) => true,
720            };
721            return Ok(test);
722        }
723        if self.deletion_set.contains_prefix_of(index) {
724            return Ok(false);
725        }
726        let key = self
727            .context
728            .base_key()
729            .base_tag_index(KeyTag::Index as u8, index);
730        Ok(self.context.store().contains_key(&key).await?)
731    }
732
733    /// Tests whether the view contains a range of indices
734    /// ```rust
735    /// # tokio_test::block_on(async {
736    /// # use linera_views::context::MemoryContext;
737    /// # use linera_views::key_value_store_view::KeyValueStoreView;
738    /// # use linera_views::views::View;
739    /// # let context = MemoryContext::new_for_testing(());
740    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
741    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
742    /// let keys = vec![vec![0, 1], vec![0, 2]];
743    /// let results = view.contains_keys(keys).await.unwrap();
744    /// assert_eq!(results, vec![true, false]);
745    /// # })
746    /// ```
747    pub async fn contains_keys(&self, indices: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewError> {
748        #[cfg(with_metrics)]
749        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
750        let mut results = Vec::with_capacity(indices.len());
751        let mut missed_indices = Vec::new();
752        let mut vector_query = Vec::new();
753        for (i, index) in indices.into_iter().enumerate() {
754            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
755            if let Some(update) = self.updates.get(&index) {
756                let value = match update {
757                    Update::Removed => false,
758                    Update::Set(_) => true,
759                };
760                results.push(value);
761            } else {
762                results.push(false);
763                if !self.deletion_set.contains_prefix_of(&index) {
764                    missed_indices.push(i);
765                    let key = self
766                        .context
767                        .base_key()
768                        .base_tag_index(KeyTag::Index as u8, &index);
769                    vector_query.push(key);
770                }
771            }
772        }
773        let values = self.context.store().contains_keys(vector_query).await?;
774        for (i, value) in missed_indices.into_iter().zip(values) {
775            results[i] = value;
776        }
777        Ok(results)
778    }
779
780    /// Obtains the values of a range of indices
781    /// ```rust
782    /// # tokio_test::block_on(async {
783    /// # use linera_views::context::MemoryContext;
784    /// # use linera_views::key_value_store_view::KeyValueStoreView;
785    /// # use linera_views::views::View;
786    /// # let context = MemoryContext::new_for_testing(());
787    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
788    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
789    /// assert_eq!(
790    ///     view.multi_get(vec![vec![0, 1], vec![0, 2]]).await.unwrap(),
791    ///     vec![Some(vec![42]), None]
792    /// );
793    /// # })
794    /// ```
795    pub async fn multi_get(
796        &self,
797        indices: Vec<Vec<u8>>,
798    ) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
799        #[cfg(with_metrics)]
800        let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
801        let mut result = Vec::with_capacity(indices.len());
802        let mut missed_indices = Vec::new();
803        let mut vector_query = Vec::new();
804        for (i, index) in indices.into_iter().enumerate() {
805            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
806            if let Some(update) = self.updates.get(&index) {
807                let value = match update {
808                    Update::Removed => None,
809                    Update::Set(value) => Some(value.clone()),
810                };
811                result.push(value);
812            } else {
813                result.push(None);
814                if !self.deletion_set.contains_prefix_of(&index) {
815                    missed_indices.push(i);
816                    let key = self
817                        .context
818                        .base_key()
819                        .base_tag_index(KeyTag::Index as u8, &index);
820                    vector_query.push(key);
821                }
822            }
823        }
824        let values = self
825            .context
826            .store()
827            .read_multi_values_bytes(vector_query)
828            .await?;
829        for (i, value) in missed_indices.into_iter().zip(values) {
830            result[i] = value;
831        }
832        Ok(result)
833    }
834
835    /// Applies the given batch of `crate::common::WriteOperation`.
836    /// ```rust
837    /// # tokio_test::block_on(async {
838    /// # use linera_views::context::MemoryContext;
839    /// # use linera_views::key_value_store_view::KeyValueStoreView;
840    /// # use linera_views::batch::Batch;
841    /// # use linera_views::views::View;
842    /// # let context = MemoryContext::new_for_testing(());
843    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
844    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
845    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
846    /// let mut batch = Batch::new();
847    /// batch.delete_key_prefix(vec![0]);
848    /// view.write_batch(batch).await.unwrap();
849    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
850    /// assert_eq!(key_values, vec![]);
851    /// # })
852    /// ```
853    pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
854        #[cfg(with_metrics)]
855        let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
856        *self.hash.get_mut().unwrap() = None;
857        let max_key_size = self.max_key_size();
858        for operation in batch.operations {
859            match operation {
860                WriteOperation::Delete { key } => {
861                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
862                    if let Some(value) = self.sizes.get(&key).await? {
863                        let entry_size = SizeData {
864                            key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
865                            value,
866                        };
867                        self.total_size.sub_assign(entry_size);
868                    }
869                    self.sizes.remove(key.clone());
870                    if self.deletion_set.contains_prefix_of(&key) {
871                        // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
872                        self.updates.remove(&key);
873                    } else {
874                        self.updates.insert(key, Update::Removed);
875                    }
876                }
877                WriteOperation::Put { key, value } => {
878                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
879                    let entry_size = SizeData {
880                        key: key.len() as u32,
881                        value: value.len() as u32,
882                    };
883                    self.total_size.add_assign(entry_size)?;
884                    if let Some(value) = self.sizes.get(&key).await? {
885                        let entry_size = SizeData {
886                            key: key.len() as u32,
887                            value,
888                        };
889                        self.total_size.sub_assign(entry_size);
890                    }
891                    self.sizes.insert(key.clone(), entry_size.value);
892                    self.updates.insert(key, Update::Set(value));
893                }
894                WriteOperation::DeletePrefix { key_prefix } => {
895                    ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
896                    let key_list = self
897                        .updates
898                        .range(get_interval(key_prefix.clone()))
899                        .map(|x| x.0.to_vec())
900                        .collect::<Vec<_>>();
901                    for key in key_list {
902                        self.updates.remove(&key);
903                    }
904                    let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
905                    for (key, value) in key_values {
906                        let entry_size = SizeData {
907                            key: key.len() as u32,
908                            value,
909                        };
910                        self.total_size.sub_assign(entry_size);
911                        self.sizes.remove(key);
912                    }
913                    self.sizes.remove_by_prefix(key_prefix.clone());
914                    self.deletion_set.insert_key_prefix(key_prefix);
915                }
916            }
917        }
918        Ok(())
919    }
920
921    /// Sets or inserts a value.
922    /// ```rust
923    /// # tokio_test::block_on(async {
924    /// # use linera_views::context::MemoryContext;
925    /// # use linera_views::key_value_store_view::KeyValueStoreView;
926    /// # use linera_views::views::View;
927    /// # let context = MemoryContext::new_for_testing(());
928    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
929    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
930    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![34]));
931    /// # })
932    /// ```
933    pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
934        let mut batch = Batch::new();
935        batch.put_key_value_bytes(index, value);
936        self.write_batch(batch).await
937    }
938
939    /// Removes a value. If absent then the action has no effect.
940    /// ```rust
941    /// # tokio_test::block_on(async {
942    /// # use linera_views::context::MemoryContext;
943    /// # use linera_views::key_value_store_view::KeyValueStoreView;
944    /// # use linera_views::views::View;
945    /// # let context = MemoryContext::new_for_testing(());
946    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
947    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
948    /// view.remove(vec![0, 1]).await.unwrap();
949    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
950    /// # })
951    /// ```
952    pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
953        let mut batch = Batch::new();
954        batch.delete_key(index);
955        self.write_batch(batch).await
956    }
957
958    /// Deletes a key prefix.
959    /// ```rust
960    /// # tokio_test::block_on(async {
961    /// # use linera_views::context::MemoryContext;
962    /// # use linera_views::key_value_store_view::KeyValueStoreView;
963    /// # use linera_views::views::View;
964    /// # let context = MemoryContext::new_for_testing(());
965    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
966    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
967    /// view.remove_by_prefix(vec![0]).await.unwrap();
968    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
969    /// # })
970    /// ```
971    pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
972        let mut batch = Batch::new();
973        batch.delete_key_prefix(key_prefix);
974        self.write_batch(batch).await
975    }
976
977    /// Iterates over all the keys matching the given prefix. The prefix is not included in the returned keys.
978    /// ```rust
979    /// # tokio_test::block_on(async {
980    /// # use linera_views::context::MemoryContext;
981    /// # use linera_views::key_value_store_view::KeyValueStoreView;
982    /// # use linera_views::views::View;
983    /// # let context = MemoryContext::new_for_testing(());
984    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
985    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
986    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
987    /// let keys = view.find_keys_by_prefix(&[0]).await.unwrap();
988    /// assert_eq!(keys, vec![vec![1]]);
989    /// # })
990    /// ```
991    pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
992        #[cfg(with_metrics)]
993        let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
994        ensure!(
995            key_prefix.len() <= self.max_key_size(),
996            ViewError::KeyTooLong
997        );
998        let len = key_prefix.len();
999        let key_prefix_full = self
1000            .context
1001            .base_key()
1002            .base_tag_index(KeyTag::Index as u8, key_prefix);
1003        let mut keys = Vec::new();
1004        let key_prefix_upper = get_upper_bound(key_prefix);
1005        let mut updates = self
1006            .updates
1007            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1008        let mut update = updates.next();
1009        if !self.deletion_set.delete_storage_first {
1010            let mut suffix_closed_set =
1011                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1012            for key in self
1013                .context
1014                .store()
1015                .find_keys_by_prefix(&key_prefix_full)
1016                .await?
1017            {
1018                loop {
1019                    match update {
1020                        Some((update_key, update_value))
1021                            if &update_key[len..] <= key.as_slice() =>
1022                        {
1023                            if let Update::Set(_) = update_value {
1024                                keys.push(update_key[len..].to_vec());
1025                            }
1026                            update = updates.next();
1027                            if update_key[len..] == key[..] {
1028                                break;
1029                            }
1030                        }
1031                        _ => {
1032                            let mut key_with_prefix = key_prefix.to_vec();
1033                            key_with_prefix.extend_from_slice(&key);
1034                            if !suffix_closed_set.find_key(&key_with_prefix) {
1035                                keys.push(key);
1036                            }
1037                            break;
1038                        }
1039                    }
1040                }
1041            }
1042        }
1043        while let Some((update_key, update_value)) = update {
1044            if let Update::Set(_) = update_value {
1045                let update_key = update_key[len..].to_vec();
1046                keys.push(update_key);
1047            }
1048            update = updates.next();
1049        }
1050        Ok(keys)
1051    }
1052
1053    /// Iterates over all the key-value pairs, for keys matching the given prefix. The
1054    /// prefix is not included in the returned keys.
1055    /// ```rust
1056    /// # tokio_test::block_on(async {
1057    /// # use linera_views::context::MemoryContext;
1058    /// # use linera_views::key_value_store_view::KeyValueStoreView;
1059    /// # use linera_views::views::View;
1060    /// # let context = MemoryContext::new_for_testing(());
1061    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1062    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1063    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1064    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
1065    /// assert_eq!(key_values, vec![(vec![1], vec![34])]);
1066    /// # })
1067    /// ```
1068    pub async fn find_key_values_by_prefix(
1069        &self,
1070        key_prefix: &[u8],
1071    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1072        #[cfg(with_metrics)]
1073        let _latency =
1074            metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1075        ensure!(
1076            key_prefix.len() <= self.max_key_size(),
1077            ViewError::KeyTooLong
1078        );
1079        let len = key_prefix.len();
1080        let key_prefix_full = self
1081            .context
1082            .base_key()
1083            .base_tag_index(KeyTag::Index as u8, key_prefix);
1084        let mut key_values = Vec::new();
1085        let key_prefix_upper = get_upper_bound(key_prefix);
1086        let mut updates = self
1087            .updates
1088            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1089        let mut update = updates.next();
1090        if !self.deletion_set.delete_storage_first {
1091            let mut suffix_closed_set =
1092                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1093            for entry in self
1094                .context
1095                .store()
1096                .find_key_values_by_prefix(&key_prefix_full)
1097                .await?
1098            {
1099                let (key, value) = entry;
1100                loop {
1101                    match update {
1102                        Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1103                            if let Update::Set(update_value) = update_value {
1104                                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1105                                key_values.push(key_value);
1106                            }
1107                            update = updates.next();
1108                            if update_key[len..] == key[..] {
1109                                break;
1110                            }
1111                        }
1112                        _ => {
1113                            let mut key_with_prefix = key_prefix.to_vec();
1114                            key_with_prefix.extend_from_slice(&key);
1115                            if !suffix_closed_set.find_key(&key_with_prefix) {
1116                                key_values.push((key, value));
1117                            }
1118                            break;
1119                        }
1120                    }
1121                }
1122            }
1123        }
1124        while let Some((update_key, update_value)) = update {
1125            if let Update::Set(update_value) = update_value {
1126                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1127                key_values.push(key_value);
1128            }
1129            update = updates.next();
1130        }
1131        Ok(key_values)
1132    }
1133
1134    async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1135        #[cfg(with_metrics)]
1136        let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1137        let mut hasher = sha3::Sha3_256::default();
1138        let mut count = 0u32;
1139        self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1140            count += 1;
1141            hasher.update_with_bytes(index)?;
1142            hasher.update_with_bytes(value)?;
1143            Ok(())
1144        })
1145        .await?;
1146        hasher.update_with_bcs_bytes(&count)?;
1147        Ok(hasher.finalize())
1148    }
1149}
1150
1151impl<C: Context> HashableView for KeyValueStoreView<C> {
1152    type Hasher = sha3::Sha3_256;
1153
1154    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1155        let hash = *self.hash.get_mut().unwrap();
1156        match hash {
1157            Some(hash) => Ok(hash),
1158            None => {
1159                let new_hash = self.compute_hash().await?;
1160                let hash = self.hash.get_mut().unwrap();
1161                *hash = Some(new_hash);
1162                Ok(new_hash)
1163            }
1164        }
1165    }
1166
1167    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1168        let hash = *self.hash.lock().unwrap();
1169        match hash {
1170            Some(hash) => Ok(hash),
1171            None => {
1172                let new_hash = self.compute_hash().await?;
1173                let mut hash = self.hash.lock().unwrap();
1174                *hash = Some(new_hash);
1175                Ok(new_hash)
1176            }
1177        }
1178    }
1179}
1180
1181/// A virtual DB client using a `KeyValueStoreView` as a backend (testing only).
1182#[cfg(with_testing)]
1183#[derive(Debug, Clone)]
1184pub struct ViewContainer<C> {
1185    view: Arc<RwLock<KeyValueStoreView<C>>>,
1186}
1187
1188#[cfg(with_testing)]
1189impl<C> WithError for ViewContainer<C> {
1190    type Error = ViewContainerError;
1191}
1192
1193#[cfg(with_testing)]
1194/// The error type for [`ViewContainer`] operations.
1195#[derive(Error, Debug)]
1196pub enum ViewContainerError {
1197    /// View error.
1198    #[error(transparent)]
1199    ViewError(#[from] ViewError),
1200
1201    /// BCS serialization error.
1202    #[error(transparent)]
1203    BcsError(#[from] bcs::Error),
1204}
1205
1206#[cfg(with_testing)]
1207impl KeyValueStoreError for ViewContainerError {
1208    const BACKEND: &'static str = "view_container";
1209}
1210
1211#[cfg(with_testing)]
1212impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1213    const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1214
1215    fn max_stream_queries(&self) -> usize {
1216        1
1217    }
1218
1219    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1220        let view = self.view.read().await;
1221        Ok(view.get(key).await?)
1222    }
1223
1224    async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1225        let view = self.view.read().await;
1226        Ok(view.contains_key(key).await?)
1227    }
1228
1229    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewContainerError> {
1230        let view = self.view.read().await;
1231        Ok(view.contains_keys(keys).await?)
1232    }
1233
1234    async fn read_multi_values_bytes(
1235        &self,
1236        keys: Vec<Vec<u8>>,
1237    ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1238        let view = self.view.read().await;
1239        Ok(view.multi_get(keys).await?)
1240    }
1241
1242    async fn find_keys_by_prefix(
1243        &self,
1244        key_prefix: &[u8],
1245    ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1246        let view = self.view.read().await;
1247        Ok(view.find_keys_by_prefix(key_prefix).await?)
1248    }
1249
1250    async fn find_key_values_by_prefix(
1251        &self,
1252        key_prefix: &[u8],
1253    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1254        let view = self.view.read().await;
1255        Ok(view.find_key_values_by_prefix(key_prefix).await?)
1256    }
1257}
1258
1259#[cfg(with_testing)]
1260impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1261    const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1262
1263    async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1264        let mut view = self.view.write().await;
1265        view.write_batch(batch).await?;
1266        let mut batch = Batch::new();
1267        view.flush(&mut batch)?;
1268        view.context()
1269            .store()
1270            .write_batch(batch)
1271            .await
1272            .map_err(ViewError::from)?;
1273        Ok(())
1274    }
1275
1276    async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1277        Ok(())
1278    }
1279}
1280
1281#[cfg(with_testing)]
1282impl<C: Context> ViewContainer<C> {
1283    /// Creates a [`ViewContainer`].
1284    pub async fn new(context: C) -> Result<Self, ViewError> {
1285        let view = KeyValueStoreView::load(context).await?;
1286        let view = Arc::new(RwLock::new(view));
1287        Ok(Self { view })
1288    }
1289}