linera_views/views/
key_value_store_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! We implement two types:
5//! 1) The first type `KeyValueStoreView` implements View and the function of `KeyValueStore`.
6//!
7//! 2) The second type `ViewContainer` encapsulates `KeyValueStoreView` and provides the following functionalities:
8//!    * The `Clone` trait
9//!    * a `write_batch` that takes a `&self` instead of a `&mut self`
10//!    * a `write_batch` that writes in the context instead of writing of the view.
11//!
12//! Currently, that second type is only used for tests.
13//!
14//! Key tags to create the sub-keys of a `KeyValueStoreView` on top of the base key.
15
16use std::{collections::BTreeMap, fmt::Debug, mem, ops::Bound::Included, sync::Mutex};
17
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{data_types::ArithmeticError, ensure};
21use serde::{Deserialize, Serialize};
22
23use crate::{
24    batch::{Batch, WriteOperation},
25    common::{
26        from_bytes_option, from_bytes_option_or_default, get_interval, get_upper_bound,
27        DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
28    },
29    context::Context,
30    map_view::ByteMapView,
31    store::{KeyIterable, KeyValueIterable, ReadableKeyValueStore},
32    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
33};
34
35#[cfg(with_metrics)]
36mod metrics {
37    use std::sync::LazyLock;
38
39    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
40    use prometheus::HistogramVec;
41
42    /// The latency of hash computation
43    pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
44        register_histogram_vec(
45            "key_value_store_view_hash_latency",
46            "KeyValueStoreView hash latency",
47            &[],
48            exponential_bucket_latencies(5.0),
49        )
50    });
51
52    /// The latency of get operation
53    pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54        register_histogram_vec(
55            "key_value_store_view_get_latency",
56            "KeyValueStoreView get latency",
57            &[],
58            exponential_bucket_latencies(5.0),
59        )
60    });
61
62    /// The latency of multi get
63    pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
64        LazyLock::new(|| {
65            register_histogram_vec(
66                "key_value_store_view_multi_get_latency",
67                "KeyValueStoreView multi get latency",
68                &[],
69                exponential_bucket_latencies(5.0),
70            )
71        });
72
73    /// The latency of contains key
74    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
75        LazyLock::new(|| {
76            register_histogram_vec(
77                "key_value_store_view_contains_key_latency",
78                "KeyValueStoreView contains key latency",
79                &[],
80                exponential_bucket_latencies(5.0),
81            )
82        });
83
84    /// The latency of contains keys
85    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
86        LazyLock::new(|| {
87            register_histogram_vec(
88                "key_value_store_view_contains_keys_latency",
89                "KeyValueStoreView contains keys latency",
90                &[],
91                exponential_bucket_latencies(5.0),
92            )
93        });
94
95    /// The latency of find keys by prefix operation
96    pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
97        LazyLock::new(|| {
98            register_histogram_vec(
99                "key_value_store_view_find_keys_by_prefix_latency",
100                "KeyValueStoreView find keys by prefix latency",
101                &[],
102                exponential_bucket_latencies(5.0),
103            )
104        });
105
106    /// The latency of find key values by prefix operation
107    pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
108        LazyLock::new(|| {
109            register_histogram_vec(
110                "key_value_store_view_find_key_values_by_prefix_latency",
111                "KeyValueStoreView find key values by prefix latency",
112                &[],
113                exponential_bucket_latencies(5.0),
114            )
115        });
116
117    /// The latency of write batch operation
118    pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
119        LazyLock::new(|| {
120            register_histogram_vec(
121                "key_value_store_view_write_batch_latency",
122                "KeyValueStoreView write batch latency",
123                &[],
124                exponential_bucket_latencies(5.0),
125            )
126        });
127}
128
129#[cfg(with_testing)]
130use {
131    crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
132    async_lock::RwLock,
133    std::sync::Arc,
134    thiserror::Error,
135};
136
137#[repr(u8)]
138enum KeyTag {
139    /// Prefix for the indices of the view.
140    Index = MIN_VIEW_TAG,
141    /// The total stored size
142    TotalSize,
143    /// The prefix where the sizes are being stored
144    Sizes,
145    /// Prefix for the hash.
146    Hash,
147}
148
149/// A pair containing the key and value size.
150#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub struct SizeData {
152    /// The size of the key
153    pub key: u32,
154    /// The size of the value
155    pub value: u32,
156}
157
158impl SizeData {
159    /// Sums both terms
160    pub fn sum(&mut self) -> u32 {
161        self.key + self.value
162    }
163
164    /// Adds a size to `self`
165    pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
166        self.key = self
167            .key
168            .checked_add(size.key)
169            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
170        self.value = self
171            .value
172            .checked_add(size.value)
173            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
174        Ok(())
175    }
176
177    /// Subtracts a size from `self`
178    pub fn sub_assign(&mut self, size: SizeData) {
179        self.key -= size.key;
180        self.value -= size.value;
181    }
182}
183
184/// A view that represents the functions of `KeyValueStore`.
185///
186/// Comment on the data set:
187/// In order to work, the view needs to store the updates and deleted prefixes.
188/// The updates and deleted prefixes have to be coherent. This means:
189/// * If an index is deleted by one in deleted prefixes then it should not be present
190///   in the updates at all.
191/// * [`DeletePrefix::key_prefix`][entry1] should not dominate anyone. That is if we have `[0,2]`
192///   then we should not have `[0,2,3]` since it would be dominated by the preceding.
193///
194/// With that we have:
195/// * in order to test if an `index` is deleted by a prefix we compute the highest deleted prefix `dp`
196///   such that `dp <= index`.
197///   If `dp` is indeed a prefix then we conclude that `index` is deleted, otherwise not.
198///   The no domination is essential here.
199///
200/// [entry1]: crate::batch::WriteOperation::DeletePrefix
201#[derive(Debug)]
202pub struct KeyValueStoreView<C> {
203    context: C,
204    deletion_set: DeletionSet,
205    updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
206    stored_total_size: SizeData,
207    total_size: SizeData,
208    sizes: ByteMapView<C, u32>,
209    stored_hash: Option<HasherOutput>,
210    hash: Mutex<Option<HasherOutput>>,
211}
212
213impl<C: Context> View for KeyValueStoreView<C> {
214    const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
215
216    type Context = C;
217
218    fn context(&self) -> &C {
219        &self.context
220    }
221
222    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
223        let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
224        let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
225        let mut v = vec![key_hash, key_total_size];
226        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
227        let context_sizes = context.clone_with_base_key(base_key);
228        v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
229        Ok(v)
230    }
231
232    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
233        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
234        let total_size =
235            from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
236        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
237        let context_sizes = context.clone_with_base_key(base_key);
238        let sizes = ByteMapView::post_load(
239            context_sizes,
240            values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
241        )?;
242        Ok(Self {
243            context,
244            deletion_set: DeletionSet::new(),
245            updates: BTreeMap::new(),
246            stored_total_size: total_size,
247            total_size,
248            sizes,
249            stored_hash: hash,
250            hash: Mutex::new(hash),
251        })
252    }
253
254    async fn load(context: C) -> Result<Self, ViewError> {
255        let keys = Self::pre_load(&context)?;
256        let values = context.store().read_multi_values_bytes(keys).await?;
257        Self::post_load(context, &values)
258    }
259
260    fn rollback(&mut self) {
261        self.deletion_set.rollback();
262        self.updates.clear();
263        self.total_size = self.stored_total_size;
264        self.sizes.rollback();
265        *self.hash.get_mut().unwrap() = self.stored_hash;
266    }
267
268    async fn has_pending_changes(&self) -> bool {
269        if self.deletion_set.has_pending_changes() {
270            return true;
271        }
272        if !self.updates.is_empty() {
273            return true;
274        }
275        if self.stored_total_size != self.total_size {
276            return true;
277        }
278        if self.sizes.has_pending_changes().await {
279            return true;
280        }
281        let hash = self.hash.lock().unwrap();
282        self.stored_hash != *hash
283    }
284
285    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
286        let mut delete_view = false;
287        if self.deletion_set.delete_storage_first {
288            delete_view = true;
289            self.stored_total_size = SizeData::default();
290            batch.delete_key_prefix(self.context.base_key().bytes.clone());
291            for (index, update) in mem::take(&mut self.updates) {
292                if let Update::Set(value) = update {
293                    let key = self
294                        .context
295                        .base_key()
296                        .base_tag_index(KeyTag::Index as u8, &index);
297                    batch.put_key_value_bytes(key, value);
298                    delete_view = false;
299                }
300            }
301            self.stored_hash = None
302        } else {
303            for index in mem::take(&mut self.deletion_set.deleted_prefixes) {
304                let key = self
305                    .context
306                    .base_key()
307                    .base_tag_index(KeyTag::Index as u8, &index);
308                batch.delete_key_prefix(key);
309            }
310            for (index, update) in mem::take(&mut self.updates) {
311                let key = self
312                    .context
313                    .base_key()
314                    .base_tag_index(KeyTag::Index as u8, &index);
315                match update {
316                    Update::Removed => batch.delete_key(key),
317                    Update::Set(value) => batch.put_key_value_bytes(key, value),
318                }
319            }
320        }
321        self.sizes.flush(batch)?;
322        let hash = *self.hash.get_mut().unwrap();
323        if self.stored_hash != hash {
324            let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
325            match hash {
326                None => batch.delete_key(key),
327                Some(hash) => batch.put_key_value(key, &hash)?,
328            }
329            self.stored_hash = hash;
330        }
331        if self.stored_total_size != self.total_size {
332            let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
333            batch.put_key_value(key, &self.total_size)?;
334            self.stored_total_size = self.total_size;
335        }
336        self.deletion_set.delete_storage_first = false;
337        Ok(delete_view)
338    }
339
340    fn clear(&mut self) {
341        self.deletion_set.clear();
342        self.updates.clear();
343        self.total_size = SizeData::default();
344        self.sizes.clear();
345        *self.hash.get_mut().unwrap() = None;
346    }
347}
348
349impl<C: Context> ClonableView for KeyValueStoreView<C> {
350    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
351        Ok(KeyValueStoreView {
352            context: self.context.clone(),
353            deletion_set: self.deletion_set.clone(),
354            updates: self.updates.clone(),
355            stored_total_size: self.stored_total_size,
356            total_size: self.total_size,
357            sizes: self.sizes.clone_unchecked()?,
358            stored_hash: self.stored_hash,
359            hash: Mutex::new(*self.hash.get_mut().unwrap()),
360        })
361    }
362}
363
364impl<C: Context> KeyValueStoreView<C> {
365    fn max_key_size(&self) -> usize {
366        let prefix_len = self.context.base_key().bytes.len();
367        <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
368    }
369
370    /// Getting the total sizes that will be used for keys and values when stored
371    /// ```rust
372    /// # tokio_test::block_on(async {
373    /// # use linera_views::context::MemoryContext;
374    /// # use linera_views::key_value_store_view::{KeyValueStoreView, SizeData};
375    /// # use linera_views::views::View;
376    /// # let context = MemoryContext::new_for_testing(());
377    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
378    /// let total_size = view.total_size();
379    /// assert_eq!(total_size, SizeData::default());
380    /// # })
381    /// ```
382    pub fn total_size(&self) -> SizeData {
383        self.total_size
384    }
385
386    /// Applies the function f over all indices. If the function f returns
387    /// false, then the loop ends prematurely.
388    /// ```rust
389    /// # tokio_test::block_on(async {
390    /// # use linera_views::context::MemoryContext;
391    /// # use linera_views::key_value_store_view::KeyValueStoreView;
392    /// # use linera_views::views::View;
393    /// # let context = MemoryContext::new_for_testing(());
394    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
395    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
396    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
397    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
398    /// let mut count = 0;
399    /// view.for_each_index_while(|_key| {
400    ///     count += 1;
401    ///     Ok(count < 2)
402    /// })
403    /// .await
404    /// .unwrap();
405    /// assert_eq!(count, 2);
406    /// # })
407    /// ```
408    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
409    where
410        F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
411    {
412        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
413        let mut updates = self.updates.iter();
414        let mut update = updates.next();
415        if !self.deletion_set.delete_storage_first {
416            let mut suffix_closed_set =
417                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
418            for index in self
419                .context
420                .store()
421                .find_keys_by_prefix(&key_prefix)
422                .await?
423                .iterator()
424            {
425                let index = index?;
426                loop {
427                    match update {
428                        Some((key, value)) if key.as_slice() <= index => {
429                            if let Update::Set(_) = value {
430                                if !f(key)? {
431                                    return Ok(());
432                                }
433                            }
434                            update = updates.next();
435                            if key == index {
436                                break;
437                            }
438                        }
439                        _ => {
440                            if !suffix_closed_set.find_key(index) && !f(index)? {
441                                return Ok(());
442                            }
443                            break;
444                        }
445                    }
446                }
447            }
448        }
449        while let Some((key, value)) = update {
450            if let Update::Set(_) = value {
451                if !f(key)? {
452                    return Ok(());
453                }
454            }
455            update = updates.next();
456        }
457        Ok(())
458    }
459
460    /// Applies the function f over all indices.
461    /// ```rust
462    /// # tokio_test::block_on(async {
463    /// # use linera_views::context::MemoryContext;
464    /// # use linera_views::key_value_store_view::KeyValueStoreView;
465    /// # use linera_views::views::View;
466    /// # let context = MemoryContext::new_for_testing(());
467    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
468    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
469    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
470    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
471    /// let mut count = 0;
472    /// view.for_each_index(|_key| {
473    ///     count += 1;
474    ///     Ok(())
475    /// })
476    /// .await
477    /// .unwrap();
478    /// assert_eq!(count, 3);
479    /// # })
480    /// ```
481    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
482    where
483        F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
484    {
485        self.for_each_index_while(|key| {
486            f(key)?;
487            Ok(true)
488        })
489        .await
490    }
491
492    /// Applies the function f over all index/value pairs.
493    /// If the function f returns false then the loop ends prematurely.
494    /// ```rust
495    /// # tokio_test::block_on(async {
496    /// # use linera_views::context::MemoryContext;
497    /// # use linera_views::key_value_store_view::KeyValueStoreView;
498    /// # use linera_views::views::View;
499    /// # let context = MemoryContext::new_for_testing(());
500    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
501    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
502    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
503    /// let mut values = Vec::new();
504    /// view.for_each_index_value_while(|_key, value| {
505    ///     values.push(value.to_vec());
506    ///     Ok(values.len() < 1)
507    /// })
508    /// .await
509    /// .unwrap();
510    /// assert_eq!(values, vec![vec![0]]);
511    /// # })
512    /// ```
513    pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
514    where
515        F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
516    {
517        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
518        let mut updates = self.updates.iter();
519        let mut update = updates.next();
520        if !self.deletion_set.delete_storage_first {
521            let mut suffix_closed_set =
522                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
523            for entry in self
524                .context
525                .store()
526                .find_key_values_by_prefix(&key_prefix)
527                .await?
528                .iterator()
529            {
530                let (index, index_val) = entry?;
531                loop {
532                    match update {
533                        Some((key, value)) if key.as_slice() <= index => {
534                            if let Update::Set(value) = value {
535                                if !f(key, value)? {
536                                    return Ok(());
537                                }
538                            }
539                            update = updates.next();
540                            if key == index {
541                                break;
542                            }
543                        }
544                        _ => {
545                            if !suffix_closed_set.find_key(index) && !f(index, index_val)? {
546                                return Ok(());
547                            }
548                            break;
549                        }
550                    }
551                }
552            }
553        }
554        while let Some((key, value)) = update {
555            if let Update::Set(value) = value {
556                if !f(key, value)? {
557                    return Ok(());
558                }
559            }
560            update = updates.next();
561        }
562        Ok(())
563    }
564
565    /// Applies the function f over all index/value pairs.
566    /// ```rust
567    /// # tokio_test::block_on(async {
568    /// # use linera_views::context::MemoryContext;
569    /// # use linera_views::key_value_store_view::KeyValueStoreView;
570    /// # use linera_views::views::View;
571    /// # let context = MemoryContext::new_for_testing(());
572    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
573    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
574    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
575    /// let mut part_keys = Vec::new();
576    /// view.for_each_index_while(|key| {
577    ///     part_keys.push(key.to_vec());
578    ///     Ok(part_keys.len() < 1)
579    /// })
580    /// .await
581    /// .unwrap();
582    /// assert_eq!(part_keys, vec![vec![0, 1]]);
583    /// # })
584    /// ```
585    pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
586    where
587        F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
588    {
589        self.for_each_index_value_while(|key, value| {
590            f(key, value)?;
591            Ok(true)
592        })
593        .await
594    }
595
596    /// Returns the list of indices in lexicographic order.
597    /// ```rust
598    /// # tokio_test::block_on(async {
599    /// # use linera_views::context::MemoryContext;
600    /// # use linera_views::key_value_store_view::KeyValueStoreView;
601    /// # use linera_views::views::View;
602    /// # let context = MemoryContext::new_for_testing(());
603    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
604    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
605    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
606    /// let indices = view.indices().await.unwrap();
607    /// assert_eq!(indices, vec![vec![0, 1], vec![0, 2]]);
608    /// # })
609    /// ```
610    pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
611        let mut indices = Vec::new();
612        self.for_each_index(|index| {
613            indices.push(index.to_vec());
614            Ok(())
615        })
616        .await?;
617        Ok(indices)
618    }
619
620    /// Returns the list of indices and values in lexicographic order.
621    /// ```rust
622    /// # tokio_test::block_on(async {
623    /// # use linera_views::context::MemoryContext;
624    /// # use linera_views::key_value_store_view::KeyValueStoreView;
625    /// # use linera_views::views::View;
626    /// # let context = MemoryContext::new_for_testing(());
627    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
628    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
629    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
630    /// let key_values = view.indices().await.unwrap();
631    /// assert_eq!(key_values, vec![vec![0, 1], vec![0, 2]]);
632    /// # })
633    /// ```
634    pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
635        let mut index_values = Vec::new();
636        self.for_each_index_value(|index, value| {
637            index_values.push((index.to_vec(), value.to_vec()));
638            Ok(())
639        })
640        .await?;
641        Ok(index_values)
642    }
643
644    /// Returns the number of entries.
645    /// ```rust
646    /// # tokio_test::block_on(async {
647    /// # use linera_views::context::MemoryContext;
648    /// # use linera_views::key_value_store_view::KeyValueStoreView;
649    /// # use linera_views::views::View;
650    /// # let context = MemoryContext::new_for_testing(());
651    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
652    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
653    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
654    /// let count = view.count().await.unwrap();
655    /// assert_eq!(count, 2);
656    /// # })
657    /// ```
658    pub async fn count(&self) -> Result<usize, ViewError> {
659        let mut count = 0;
660        self.for_each_index(|_index| {
661            count += 1;
662            Ok(())
663        })
664        .await?;
665        Ok(count)
666    }
667
668    /// Obtains the value at the given index, if any.
669    /// ```rust
670    /// # tokio_test::block_on(async {
671    /// # use linera_views::context::MemoryContext;
672    /// # use linera_views::key_value_store_view::KeyValueStoreView;
673    /// # use linera_views::views::View;
674    /// # let context = MemoryContext::new_for_testing(());
675    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
676    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
677    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![42]));
678    /// assert_eq!(view.get(&[0, 2]).await.unwrap(), None);
679    /// # })
680    /// ```
681    pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
682        #[cfg(with_metrics)]
683        let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
684        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
685        if let Some(update) = self.updates.get(index) {
686            let value = match update {
687                Update::Removed => None,
688                Update::Set(value) => Some(value.clone()),
689            };
690            return Ok(value);
691        }
692        if self.deletion_set.contains_prefix_of(index) {
693            return Ok(None);
694        }
695        let key = self
696            .context
697            .base_key()
698            .base_tag_index(KeyTag::Index as u8, index);
699        Ok(self.context.store().read_value_bytes(&key).await?)
700    }
701
702    /// Tests whether the store contains a specific index.
703    /// ```rust
704    /// # tokio_test::block_on(async {
705    /// # use linera_views::context::MemoryContext;
706    /// # use linera_views::key_value_store_view::KeyValueStoreView;
707    /// # use linera_views::views::View;
708    /// # let context = MemoryContext::new_for_testing(());
709    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
710    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
711    /// assert!(view.contains_key(&[0, 1]).await.unwrap());
712    /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
713    /// # })
714    /// ```
715    pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
716        #[cfg(with_metrics)]
717        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
718        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
719        if let Some(update) = self.updates.get(index) {
720            let test = match update {
721                Update::Removed => false,
722                Update::Set(_value) => true,
723            };
724            return Ok(test);
725        }
726        if self.deletion_set.contains_prefix_of(index) {
727            return Ok(false);
728        }
729        let key = self
730            .context
731            .base_key()
732            .base_tag_index(KeyTag::Index as u8, index);
733        Ok(self.context.store().contains_key(&key).await?)
734    }
735
736    /// Tests whether the view contains a range of indices
737    /// ```rust
738    /// # tokio_test::block_on(async {
739    /// # use linera_views::context::MemoryContext;
740    /// # use linera_views::key_value_store_view::KeyValueStoreView;
741    /// # use linera_views::views::View;
742    /// # let context = MemoryContext::new_for_testing(());
743    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
744    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
745    /// let keys = vec![vec![0, 1], vec![0, 2]];
746    /// let results = view.contains_keys(keys).await.unwrap();
747    /// assert_eq!(results, vec![true, false]);
748    /// # })
749    /// ```
750    pub async fn contains_keys(&self, indices: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewError> {
751        #[cfg(with_metrics)]
752        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
753        let mut results = Vec::with_capacity(indices.len());
754        let mut missed_indices = Vec::new();
755        let mut vector_query = Vec::new();
756        for (i, index) in indices.into_iter().enumerate() {
757            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
758            if let Some(update) = self.updates.get(&index) {
759                let value = match update {
760                    Update::Removed => false,
761                    Update::Set(_) => true,
762                };
763                results.push(value);
764            } else {
765                results.push(false);
766                if !self.deletion_set.contains_prefix_of(&index) {
767                    missed_indices.push(i);
768                    let key = self
769                        .context
770                        .base_key()
771                        .base_tag_index(KeyTag::Index as u8, &index);
772                    vector_query.push(key);
773                }
774            }
775        }
776        let values = self.context.store().contains_keys(vector_query).await?;
777        for (i, value) in missed_indices.into_iter().zip(values) {
778            results[i] = value;
779        }
780        Ok(results)
781    }
782
783    /// Obtains the values of a range of indices
784    /// ```rust
785    /// # tokio_test::block_on(async {
786    /// # use linera_views::context::MemoryContext;
787    /// # use linera_views::key_value_store_view::KeyValueStoreView;
788    /// # use linera_views::views::View;
789    /// # let context = MemoryContext::new_for_testing(());
790    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
791    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
792    /// assert_eq!(
793    ///     view.multi_get(vec![vec![0, 1], vec![0, 2]]).await.unwrap(),
794    ///     vec![Some(vec![42]), None]
795    /// );
796    /// # })
797    /// ```
798    pub async fn multi_get(
799        &self,
800        indices: Vec<Vec<u8>>,
801    ) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
802        #[cfg(with_metrics)]
803        let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
804        let mut result = Vec::with_capacity(indices.len());
805        let mut missed_indices = Vec::new();
806        let mut vector_query = Vec::new();
807        for (i, index) in indices.into_iter().enumerate() {
808            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
809            if let Some(update) = self.updates.get(&index) {
810                let value = match update {
811                    Update::Removed => None,
812                    Update::Set(value) => Some(value.clone()),
813                };
814                result.push(value);
815            } else {
816                result.push(None);
817                if !self.deletion_set.contains_prefix_of(&index) {
818                    missed_indices.push(i);
819                    let key = self
820                        .context
821                        .base_key()
822                        .base_tag_index(KeyTag::Index as u8, &index);
823                    vector_query.push(key);
824                }
825            }
826        }
827        let values = self
828            .context
829            .store()
830            .read_multi_values_bytes(vector_query)
831            .await?;
832        for (i, value) in missed_indices.into_iter().zip(values) {
833            result[i] = value;
834        }
835        Ok(result)
836    }
837
838    /// Applies the given batch of `crate::common::WriteOperation`.
839    /// ```rust
840    /// # tokio_test::block_on(async {
841    /// # use linera_views::context::MemoryContext;
842    /// # use linera_views::key_value_store_view::KeyValueStoreView;
843    /// # use linera_views::batch::Batch;
844    /// # use linera_views::views::View;
845    /// # let context = MemoryContext::new_for_testing(());
846    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
847    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
848    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
849    /// let mut batch = Batch::new();
850    /// batch.delete_key_prefix(vec![0]);
851    /// view.write_batch(batch).await.unwrap();
852    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
853    /// assert_eq!(key_values, vec![]);
854    /// # })
855    /// ```
856    pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
857        #[cfg(with_metrics)]
858        let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
859        *self.hash.get_mut().unwrap() = None;
860        let max_key_size = self.max_key_size();
861        for operation in batch.operations {
862            match operation {
863                WriteOperation::Delete { key } => {
864                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
865                    if let Some(value) = self.sizes.get(&key).await? {
866                        let entry_size = SizeData {
867                            key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
868                            value,
869                        };
870                        self.total_size.sub_assign(entry_size);
871                    }
872                    self.sizes.remove(key.clone());
873                    if self.deletion_set.contains_prefix_of(&key) {
874                        // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
875                        self.updates.remove(&key);
876                    } else {
877                        self.updates.insert(key, Update::Removed);
878                    }
879                }
880                WriteOperation::Put { key, value } => {
881                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
882                    let entry_size = SizeData {
883                        key: key.len() as u32,
884                        value: value.len() as u32,
885                    };
886                    self.total_size.add_assign(entry_size)?;
887                    if let Some(value) = self.sizes.get(&key).await? {
888                        let entry_size = SizeData {
889                            key: key.len() as u32,
890                            value,
891                        };
892                        self.total_size.sub_assign(entry_size);
893                    }
894                    self.sizes.insert(key.clone(), entry_size.value);
895                    self.updates.insert(key, Update::Set(value));
896                }
897                WriteOperation::DeletePrefix { key_prefix } => {
898                    ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
899                    let key_list = self
900                        .updates
901                        .range(get_interval(key_prefix.clone()))
902                        .map(|x| x.0.to_vec())
903                        .collect::<Vec<_>>();
904                    for key in key_list {
905                        self.updates.remove(&key);
906                    }
907                    let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
908                    for (key, value) in key_values {
909                        let entry_size = SizeData {
910                            key: key.len() as u32,
911                            value,
912                        };
913                        self.total_size.sub_assign(entry_size);
914                        self.sizes.remove(key);
915                    }
916                    self.sizes.remove_by_prefix(key_prefix.clone());
917                    self.deletion_set.insert_key_prefix(key_prefix);
918                }
919            }
920        }
921        Ok(())
922    }
923
924    /// Sets or inserts a value.
925    /// ```rust
926    /// # tokio_test::block_on(async {
927    /// # use linera_views::context::MemoryContext;
928    /// # use linera_views::key_value_store_view::KeyValueStoreView;
929    /// # use linera_views::views::View;
930    /// # let context = MemoryContext::new_for_testing(());
931    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
932    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
933    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![34]));
934    /// # })
935    /// ```
936    pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
937        let mut batch = Batch::new();
938        batch.put_key_value_bytes(index, value);
939        self.write_batch(batch).await
940    }
941
942    /// Removes a value. If absent then the action has no effect.
943    /// ```rust
944    /// # tokio_test::block_on(async {
945    /// # use linera_views::context::MemoryContext;
946    /// # use linera_views::key_value_store_view::KeyValueStoreView;
947    /// # use linera_views::views::View;
948    /// # let context = MemoryContext::new_for_testing(());
949    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
950    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
951    /// view.remove(vec![0, 1]).await.unwrap();
952    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
953    /// # })
954    /// ```
955    pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
956        let mut batch = Batch::new();
957        batch.delete_key(index);
958        self.write_batch(batch).await
959    }
960
961    /// Deletes a key prefix.
962    /// ```rust
963    /// # tokio_test::block_on(async {
964    /// # use linera_views::context::MemoryContext;
965    /// # use linera_views::key_value_store_view::KeyValueStoreView;
966    /// # use linera_views::views::View;
967    /// # let context = MemoryContext::new_for_testing(());
968    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
969    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
970    /// view.remove_by_prefix(vec![0]).await.unwrap();
971    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
972    /// # })
973    /// ```
974    pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
975        let mut batch = Batch::new();
976        batch.delete_key_prefix(key_prefix);
977        self.write_batch(batch).await
978    }
979
980    /// Iterates over all the keys matching the given prefix. The prefix is not included in the returned keys.
981    /// ```rust
982    /// # tokio_test::block_on(async {
983    /// # use linera_views::context::MemoryContext;
984    /// # use linera_views::key_value_store_view::KeyValueStoreView;
985    /// # use linera_views::views::View;
986    /// # let context = MemoryContext::new_for_testing(());
987    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
988    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
989    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
990    /// let keys = view.find_keys_by_prefix(&[0]).await.unwrap();
991    /// assert_eq!(keys, vec![vec![1]]);
992    /// # })
993    /// ```
994    pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
995        #[cfg(with_metrics)]
996        let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
997        ensure!(
998            key_prefix.len() <= self.max_key_size(),
999            ViewError::KeyTooLong
1000        );
1001        let len = key_prefix.len();
1002        let key_prefix_full = self
1003            .context
1004            .base_key()
1005            .base_tag_index(KeyTag::Index as u8, key_prefix);
1006        let mut keys = Vec::new();
1007        let key_prefix_upper = get_upper_bound(key_prefix);
1008        let mut updates = self
1009            .updates
1010            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1011        let mut update = updates.next();
1012        if !self.deletion_set.delete_storage_first {
1013            let mut suffix_closed_set =
1014                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1015            for key in self
1016                .context
1017                .store()
1018                .find_keys_by_prefix(&key_prefix_full)
1019                .await?
1020                .iterator()
1021            {
1022                let key = key?;
1023                loop {
1024                    match update {
1025                        Some((update_key, update_value)) if &update_key[len..] <= key => {
1026                            if let Update::Set(_) = update_value {
1027                                keys.push(update_key[len..].to_vec());
1028                            }
1029                            update = updates.next();
1030                            if update_key[len..] == key[..] {
1031                                break;
1032                            }
1033                        }
1034                        _ => {
1035                            let mut key_with_prefix = key_prefix.to_vec();
1036                            key_with_prefix.extend_from_slice(key);
1037                            if !suffix_closed_set.find_key(&key_with_prefix) {
1038                                keys.push(key.to_vec());
1039                            }
1040                            break;
1041                        }
1042                    }
1043                }
1044            }
1045        }
1046        while let Some((update_key, update_value)) = update {
1047            if let Update::Set(_) = update_value {
1048                let update_key = update_key[len..].to_vec();
1049                keys.push(update_key);
1050            }
1051            update = updates.next();
1052        }
1053        Ok(keys)
1054    }
1055
1056    /// Iterates over all the key-value pairs, for keys matching the given prefix. The
1057    /// prefix is not included in the returned keys.
1058    /// ```rust
1059    /// # tokio_test::block_on(async {
1060    /// # use linera_views::context::MemoryContext;
1061    /// # use linera_views::key_value_store_view::KeyValueStoreView;
1062    /// # use linera_views::views::View;
1063    /// # let context = MemoryContext::new_for_testing(());
1064    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1065    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1066    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1067    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
1068    /// assert_eq!(key_values, vec![(vec![1], vec![34])]);
1069    /// # })
1070    /// ```
1071    pub async fn find_key_values_by_prefix(
1072        &self,
1073        key_prefix: &[u8],
1074    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1075        #[cfg(with_metrics)]
1076        let _latency =
1077            metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1078        ensure!(
1079            key_prefix.len() <= self.max_key_size(),
1080            ViewError::KeyTooLong
1081        );
1082        let len = key_prefix.len();
1083        let key_prefix_full = self
1084            .context
1085            .base_key()
1086            .base_tag_index(KeyTag::Index as u8, key_prefix);
1087        let mut key_values = Vec::new();
1088        let key_prefix_upper = get_upper_bound(key_prefix);
1089        let mut updates = self
1090            .updates
1091            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1092        let mut update = updates.next();
1093        if !self.deletion_set.delete_storage_first {
1094            let mut suffix_closed_set =
1095                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1096            for entry in self
1097                .context
1098                .store()
1099                .find_key_values_by_prefix(&key_prefix_full)
1100                .await?
1101                .into_iterator_owned()
1102            {
1103                let (key, value) = entry?;
1104                loop {
1105                    match update {
1106                        Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1107                            if let Update::Set(update_value) = update_value {
1108                                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1109                                key_values.push(key_value);
1110                            }
1111                            update = updates.next();
1112                            if update_key[len..] == key[..] {
1113                                break;
1114                            }
1115                        }
1116                        _ => {
1117                            let mut key_with_prefix = key_prefix.to_vec();
1118                            key_with_prefix.extend_from_slice(&key);
1119                            if !suffix_closed_set.find_key(&key_with_prefix) {
1120                                key_values.push((key, value));
1121                            }
1122                            break;
1123                        }
1124                    }
1125                }
1126            }
1127        }
1128        while let Some((update_key, update_value)) = update {
1129            if let Update::Set(update_value) = update_value {
1130                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1131                key_values.push(key_value);
1132            }
1133            update = updates.next();
1134        }
1135        Ok(key_values)
1136    }
1137
1138    async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1139        #[cfg(with_metrics)]
1140        let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1141        let mut hasher = sha3::Sha3_256::default();
1142        let mut count = 0u32;
1143        self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1144            count += 1;
1145            hasher.update_with_bytes(index)?;
1146            hasher.update_with_bytes(value)?;
1147            Ok(())
1148        })
1149        .await?;
1150        hasher.update_with_bcs_bytes(&count)?;
1151        Ok(hasher.finalize())
1152    }
1153}
1154
1155impl<C: Context> HashableView for KeyValueStoreView<C> {
1156    type Hasher = sha3::Sha3_256;
1157
1158    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1159        let hash = *self.hash.get_mut().unwrap();
1160        match hash {
1161            Some(hash) => Ok(hash),
1162            None => {
1163                let new_hash = self.compute_hash().await?;
1164                let hash = self.hash.get_mut().unwrap();
1165                *hash = Some(new_hash);
1166                Ok(new_hash)
1167            }
1168        }
1169    }
1170
1171    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1172        let hash = *self.hash.lock().unwrap();
1173        match hash {
1174            Some(hash) => Ok(hash),
1175            None => {
1176                let new_hash = self.compute_hash().await?;
1177                let mut hash = self.hash.lock().unwrap();
1178                *hash = Some(new_hash);
1179                Ok(new_hash)
1180            }
1181        }
1182    }
1183}
1184
1185/// A virtual DB client using a `KeyValueStoreView` as a backend (testing only).
1186#[cfg(with_testing)]
1187#[derive(Debug, Clone)]
1188pub struct ViewContainer<C> {
1189    view: Arc<RwLock<KeyValueStoreView<C>>>,
1190}
1191
1192#[cfg(with_testing)]
1193impl<C> WithError for ViewContainer<C> {
1194    type Error = ViewContainerError;
1195}
1196
1197#[cfg(with_testing)]
1198/// The error type for [`ViewContainer`] operations.
1199#[derive(Error, Debug)]
1200pub enum ViewContainerError {
1201    /// View error.
1202    #[error(transparent)]
1203    ViewError(#[from] ViewError),
1204
1205    /// BCS serialization error.
1206    #[error(transparent)]
1207    BcsError(#[from] bcs::Error),
1208}
1209
1210#[cfg(with_testing)]
1211impl KeyValueStoreError for ViewContainerError {
1212    const BACKEND: &'static str = "view_container";
1213}
1214
1215#[cfg(with_testing)]
1216impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1217    const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1218    type Keys = Vec<Vec<u8>>;
1219    type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
1220
1221    fn max_stream_queries(&self) -> usize {
1222        1
1223    }
1224
1225    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1226        let view = self.view.read().await;
1227        Ok(view.get(key).await?)
1228    }
1229
1230    async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1231        let view = self.view.read().await;
1232        Ok(view.contains_key(key).await?)
1233    }
1234
1235    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewContainerError> {
1236        let view = self.view.read().await;
1237        Ok(view.contains_keys(keys).await?)
1238    }
1239
1240    async fn read_multi_values_bytes(
1241        &self,
1242        keys: Vec<Vec<u8>>,
1243    ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1244        let view = self.view.read().await;
1245        Ok(view.multi_get(keys).await?)
1246    }
1247
1248    async fn find_keys_by_prefix(
1249        &self,
1250        key_prefix: &[u8],
1251    ) -> Result<Self::Keys, ViewContainerError> {
1252        let view = self.view.read().await;
1253        Ok(view.find_keys_by_prefix(key_prefix).await?)
1254    }
1255
1256    async fn find_key_values_by_prefix(
1257        &self,
1258        key_prefix: &[u8],
1259    ) -> Result<Self::KeyValues, ViewContainerError> {
1260        let view = self.view.read().await;
1261        Ok(view.find_key_values_by_prefix(key_prefix).await?)
1262    }
1263}
1264
1265#[cfg(with_testing)]
1266impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1267    const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1268
1269    async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1270        let mut view = self.view.write().await;
1271        view.write_batch(batch).await?;
1272        let mut batch = Batch::new();
1273        view.flush(&mut batch)?;
1274        view.context()
1275            .store()
1276            .write_batch(batch)
1277            .await
1278            .map_err(ViewError::from)?;
1279        Ok(())
1280    }
1281
1282    async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1283        Ok(())
1284    }
1285}
1286
1287#[cfg(with_testing)]
1288impl<C: Context> ViewContainer<C> {
1289    /// Creates a [`ViewContainer`].
1290    pub async fn new(context: C) -> Result<Self, ViewError> {
1291        let view = KeyValueStoreView::load(context).await?;
1292        let view = Arc::new(RwLock::new(view));
1293        Ok(Self { view })
1294    }
1295}