linera_views/views/
key_value_store_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! We implement two types:
5//! 1) The first type `KeyValueStoreView` implements View and the function of `KeyValueStore`.
6//!
7//! 2) The second type `ViewContainer` encapsulates `KeyValueStoreView` and provides the following functionalities:
8//!    * The `Clone` trait
9//!    * a `write_batch` that takes a `&self` instead of a `&mut self`
10//!    * a `write_batch` that writes in the context instead of writing of the view.
11//!
12//! Currently, that second type is only used for tests.
13//!
14//! Key tags to create the sub-keys of a `KeyValueStoreView` on top of the base key.
15
16use std::{collections::BTreeMap, fmt::Debug, mem, ops::Bound::Included, sync::Mutex};
17
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{data_types::ArithmeticError, ensure};
21use serde::{Deserialize, Serialize};
22
23use crate::{
24    batch::{Batch, WriteOperation},
25    common::{
26        from_bytes_option, from_bytes_option_or_default, get_interval, get_upper_bound,
27        DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
28    },
29    context::Context,
30    map_view::ByteMapView,
31    store::ReadableKeyValueStore,
32    views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
33};
34
35#[cfg(with_metrics)]
36mod metrics {
37    use std::sync::LazyLock;
38
39    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
40    use prometheus::HistogramVec;
41
42    /// The latency of hash computation
43    pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
44        register_histogram_vec(
45            "key_value_store_view_hash_latency",
46            "KeyValueStoreView hash latency",
47            &[],
48            exponential_bucket_latencies(5.0),
49        )
50    });
51
52    /// The latency of get operation
53    pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54        register_histogram_vec(
55            "key_value_store_view_get_latency",
56            "KeyValueStoreView get latency",
57            &[],
58            exponential_bucket_latencies(5.0),
59        )
60    });
61
62    /// The latency of multi get
63    pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
64        LazyLock::new(|| {
65            register_histogram_vec(
66                "key_value_store_view_multi_get_latency",
67                "KeyValueStoreView multi get latency",
68                &[],
69                exponential_bucket_latencies(5.0),
70            )
71        });
72
73    /// The latency of contains key
74    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
75        LazyLock::new(|| {
76            register_histogram_vec(
77                "key_value_store_view_contains_key_latency",
78                "KeyValueStoreView contains key latency",
79                &[],
80                exponential_bucket_latencies(5.0),
81            )
82        });
83
84    /// The latency of contains keys
85    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
86        LazyLock::new(|| {
87            register_histogram_vec(
88                "key_value_store_view_contains_keys_latency",
89                "KeyValueStoreView contains keys latency",
90                &[],
91                exponential_bucket_latencies(5.0),
92            )
93        });
94
95    /// The latency of find keys by prefix operation
96    pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
97        LazyLock::new(|| {
98            register_histogram_vec(
99                "key_value_store_view_find_keys_by_prefix_latency",
100                "KeyValueStoreView find keys by prefix latency",
101                &[],
102                exponential_bucket_latencies(5.0),
103            )
104        });
105
106    /// The latency of find key values by prefix operation
107    pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
108        LazyLock::new(|| {
109            register_histogram_vec(
110                "key_value_store_view_find_key_values_by_prefix_latency",
111                "KeyValueStoreView find key values by prefix latency",
112                &[],
113                exponential_bucket_latencies(5.0),
114            )
115        });
116
117    /// The latency of write batch operation
118    pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
119        LazyLock::new(|| {
120            register_histogram_vec(
121                "key_value_store_view_write_batch_latency",
122                "KeyValueStoreView write batch latency",
123                &[],
124                exponential_bucket_latencies(5.0),
125            )
126        });
127}
128
129#[cfg(with_testing)]
130use {
131    crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
132    async_lock::RwLock,
133    std::sync::Arc,
134    thiserror::Error,
135};
136
137#[repr(u8)]
138enum KeyTag {
139    /// Prefix for the indices of the view.
140    Index = MIN_VIEW_TAG,
141    /// The total stored size
142    TotalSize,
143    /// The prefix where the sizes are being stored
144    Sizes,
145    /// Prefix for the hash.
146    Hash,
147}
148
149/// A pair containing the key and value size.
150#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub struct SizeData {
152    /// The size of the key
153    pub key: u32,
154    /// The size of the value
155    pub value: u32,
156}
157
158impl SizeData {
159    /// Sums both terms
160    pub fn sum(&mut self) -> u32 {
161        self.key + self.value
162    }
163
164    /// Adds a size to `self`
165    pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
166        self.key = self
167            .key
168            .checked_add(size.key)
169            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
170        self.value = self
171            .value
172            .checked_add(size.value)
173            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
174        Ok(())
175    }
176
177    /// Subtracts a size from `self`
178    pub fn sub_assign(&mut self, size: SizeData) {
179        self.key -= size.key;
180        self.value -= size.value;
181    }
182}
183
184/// A view that represents the functions of `KeyValueStore`.
185///
186/// Comment on the data set:
187/// In order to work, the view needs to store the updates and deleted prefixes.
188/// The updates and deleted prefixes have to be coherent. This means:
189/// * If an index is deleted by one in deleted prefixes then it should not be present
190///   in the updates at all.
191/// * [`DeletePrefix::key_prefix`][entry1] should not dominate anyone. That is if we have `[0,2]`
192///   then we should not have `[0,2,3]` since it would be dominated by the preceding.
193///
194/// With that we have:
195/// * in order to test if an `index` is deleted by a prefix we compute the highest deleted prefix `dp`
196///   such that `dp <= index`.
197///   If `dp` is indeed a prefix then we conclude that `index` is deleted, otherwise not.
198///   The no domination is essential here.
199///
200/// [entry1]: crate::batch::WriteOperation::DeletePrefix
201#[derive(Debug)]
202pub struct KeyValueStoreView<C> {
203    context: C,
204    deletion_set: DeletionSet,
205    updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
206    stored_total_size: SizeData,
207    total_size: SizeData,
208    sizes: ByteMapView<C, u32>,
209    stored_hash: Option<HasherOutput>,
210    hash: Mutex<Option<HasherOutput>>,
211}
212
213impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
214    type Target = KeyValueStoreView<C2>;
215
216    async fn with_context(
217        &mut self,
218        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
219    ) -> Self::Target {
220        let hash = *self.hash.lock().unwrap();
221        KeyValueStoreView {
222            context: ctx.clone()(self.context()),
223            deletion_set: self.deletion_set.clone(),
224            updates: self.updates.clone(),
225            stored_total_size: self.stored_total_size,
226            total_size: self.total_size,
227            sizes: self.sizes.with_context(ctx.clone()).await,
228            stored_hash: self.stored_hash,
229            hash: Mutex::new(hash),
230        }
231    }
232}
233
234impl<C: Context> View for KeyValueStoreView<C> {
235    const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
236
237    type Context = C;
238
239    fn context(&self) -> &C {
240        &self.context
241    }
242
243    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
244        let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
245        let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
246        let mut v = vec![key_hash, key_total_size];
247        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
248        let context_sizes = context.clone_with_base_key(base_key);
249        v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
250        Ok(v)
251    }
252
253    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
254        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
255        let total_size =
256            from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
257        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
258        let context_sizes = context.clone_with_base_key(base_key);
259        let sizes = ByteMapView::post_load(
260            context_sizes,
261            values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
262        )?;
263        Ok(Self {
264            context,
265            deletion_set: DeletionSet::new(),
266            updates: BTreeMap::new(),
267            stored_total_size: total_size,
268            total_size,
269            sizes,
270            stored_hash: hash,
271            hash: Mutex::new(hash),
272        })
273    }
274
275    async fn load(context: C) -> Result<Self, ViewError> {
276        let keys = Self::pre_load(&context)?;
277        let values = context.store().read_multi_values_bytes(keys).await?;
278        Self::post_load(context, &values)
279    }
280
281    fn rollback(&mut self) {
282        self.deletion_set.rollback();
283        self.updates.clear();
284        self.total_size = self.stored_total_size;
285        self.sizes.rollback();
286        *self.hash.get_mut().unwrap() = self.stored_hash;
287    }
288
289    async fn has_pending_changes(&self) -> bool {
290        if self.deletion_set.has_pending_changes() {
291            return true;
292        }
293        if !self.updates.is_empty() {
294            return true;
295        }
296        if self.stored_total_size != self.total_size {
297            return true;
298        }
299        if self.sizes.has_pending_changes().await {
300            return true;
301        }
302        let hash = self.hash.lock().unwrap();
303        self.stored_hash != *hash
304    }
305
306    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
307        let mut delete_view = false;
308        if self.deletion_set.delete_storage_first {
309            delete_view = true;
310            self.stored_total_size = SizeData::default();
311            batch.delete_key_prefix(self.context.base_key().bytes.clone());
312            for (index, update) in mem::take(&mut self.updates) {
313                if let Update::Set(value) = update {
314                    let key = self
315                        .context
316                        .base_key()
317                        .base_tag_index(KeyTag::Index as u8, &index);
318                    batch.put_key_value_bytes(key, value);
319                    delete_view = false;
320                }
321            }
322            self.stored_hash = None
323        } else {
324            for index in mem::take(&mut self.deletion_set.deleted_prefixes) {
325                let key = self
326                    .context
327                    .base_key()
328                    .base_tag_index(KeyTag::Index as u8, &index);
329                batch.delete_key_prefix(key);
330            }
331            for (index, update) in mem::take(&mut self.updates) {
332                let key = self
333                    .context
334                    .base_key()
335                    .base_tag_index(KeyTag::Index as u8, &index);
336                match update {
337                    Update::Removed => batch.delete_key(key),
338                    Update::Set(value) => batch.put_key_value_bytes(key, value),
339                }
340            }
341        }
342        self.sizes.flush(batch)?;
343        let hash = *self.hash.get_mut().unwrap();
344        if self.stored_hash != hash {
345            let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
346            match hash {
347                None => batch.delete_key(key),
348                Some(hash) => batch.put_key_value(key, &hash)?,
349            }
350            self.stored_hash = hash;
351        }
352        if self.stored_total_size != self.total_size {
353            let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
354            batch.put_key_value(key, &self.total_size)?;
355            self.stored_total_size = self.total_size;
356        }
357        self.deletion_set.delete_storage_first = false;
358        Ok(delete_view)
359    }
360
361    fn clear(&mut self) {
362        self.deletion_set.clear();
363        self.updates.clear();
364        self.total_size = SizeData::default();
365        self.sizes.clear();
366        *self.hash.get_mut().unwrap() = None;
367    }
368}
369
370impl<C: Context> ClonableView for KeyValueStoreView<C> {
371    fn clone_unchecked(&mut self) -> Self {
372        KeyValueStoreView {
373            context: self.context.clone(),
374            deletion_set: self.deletion_set.clone(),
375            updates: self.updates.clone(),
376            stored_total_size: self.stored_total_size,
377            total_size: self.total_size,
378            sizes: self.sizes.clone_unchecked(),
379            stored_hash: self.stored_hash,
380            hash: Mutex::new(*self.hash.get_mut().unwrap()),
381        }
382    }
383}
384
385impl<C: Context> KeyValueStoreView<C> {
386    fn max_key_size(&self) -> usize {
387        let prefix_len = self.context.base_key().bytes.len();
388        <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
389    }
390
391    /// Getting the total sizes that will be used for keys and values when stored
392    /// ```rust
393    /// # tokio_test::block_on(async {
394    /// # use linera_views::context::MemoryContext;
395    /// # use linera_views::key_value_store_view::{KeyValueStoreView, SizeData};
396    /// # use linera_views::views::View;
397    /// # let context = MemoryContext::new_for_testing(());
398    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
399    /// let total_size = view.total_size();
400    /// assert_eq!(total_size, SizeData::default());
401    /// # })
402    /// ```
403    pub fn total_size(&self) -> SizeData {
404        self.total_size
405    }
406
407    /// Applies the function f over all indices. If the function f returns
408    /// false, then the loop ends prematurely.
409    /// ```rust
410    /// # tokio_test::block_on(async {
411    /// # use linera_views::context::MemoryContext;
412    /// # use linera_views::key_value_store_view::KeyValueStoreView;
413    /// # use linera_views::views::View;
414    /// # let context = MemoryContext::new_for_testing(());
415    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
416    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
417    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
418    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
419    /// let mut count = 0;
420    /// view.for_each_index_while(|_key| {
421    ///     count += 1;
422    ///     Ok(count < 2)
423    /// })
424    /// .await
425    /// .unwrap();
426    /// assert_eq!(count, 2);
427    /// # })
428    /// ```
429    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
430    where
431        F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
432    {
433        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
434        let mut updates = self.updates.iter();
435        let mut update = updates.next();
436        if !self.deletion_set.delete_storage_first {
437            let mut suffix_closed_set =
438                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
439            for index in self
440                .context
441                .store()
442                .find_keys_by_prefix(&key_prefix)
443                .await?
444            {
445                loop {
446                    match update {
447                        Some((key, value)) if key <= &index => {
448                            if let Update::Set(_) = value {
449                                if !f(key)? {
450                                    return Ok(());
451                                }
452                            }
453                            update = updates.next();
454                            if key == &index {
455                                break;
456                            }
457                        }
458                        _ => {
459                            if !suffix_closed_set.find_key(&index) && !f(&index)? {
460                                return Ok(());
461                            }
462                            break;
463                        }
464                    }
465                }
466            }
467        }
468        while let Some((key, value)) = update {
469            if let Update::Set(_) = value {
470                if !f(key)? {
471                    return Ok(());
472                }
473            }
474            update = updates.next();
475        }
476        Ok(())
477    }
478
479    /// Applies the function f over all indices.
480    /// ```rust
481    /// # tokio_test::block_on(async {
482    /// # use linera_views::context::MemoryContext;
483    /// # use linera_views::key_value_store_view::KeyValueStoreView;
484    /// # use linera_views::views::View;
485    /// # let context = MemoryContext::new_for_testing(());
486    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
487    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
488    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
489    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
490    /// let mut count = 0;
491    /// view.for_each_index(|_key| {
492    ///     count += 1;
493    ///     Ok(())
494    /// })
495    /// .await
496    /// .unwrap();
497    /// assert_eq!(count, 3);
498    /// # })
499    /// ```
500    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
501    where
502        F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
503    {
504        self.for_each_index_while(|key| {
505            f(key)?;
506            Ok(true)
507        })
508        .await
509    }
510
511    /// Applies the function f over all index/value pairs.
512    /// If the function f returns false then the loop ends prematurely.
513    /// ```rust
514    /// # tokio_test::block_on(async {
515    /// # use linera_views::context::MemoryContext;
516    /// # use linera_views::key_value_store_view::KeyValueStoreView;
517    /// # use linera_views::views::View;
518    /// # let context = MemoryContext::new_for_testing(());
519    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
520    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
521    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
522    /// let mut values = Vec::new();
523    /// view.for_each_index_value_while(|_key, value| {
524    ///     values.push(value.to_vec());
525    ///     Ok(values.len() < 1)
526    /// })
527    /// .await
528    /// .unwrap();
529    /// assert_eq!(values, vec![vec![0]]);
530    /// # })
531    /// ```
532    pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
533    where
534        F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
535    {
536        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
537        let mut updates = self.updates.iter();
538        let mut update = updates.next();
539        if !self.deletion_set.delete_storage_first {
540            let mut suffix_closed_set =
541                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
542            for entry in self
543                .context
544                .store()
545                .find_key_values_by_prefix(&key_prefix)
546                .await?
547            {
548                let (index, index_val) = entry;
549                loop {
550                    match update {
551                        Some((key, value)) if key <= &index => {
552                            if let Update::Set(value) = value {
553                                if !f(key, value)? {
554                                    return Ok(());
555                                }
556                            }
557                            update = updates.next();
558                            if key == &index {
559                                break;
560                            }
561                        }
562                        _ => {
563                            if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
564                                return Ok(());
565                            }
566                            break;
567                        }
568                    }
569                }
570            }
571        }
572        while let Some((key, value)) = update {
573            if let Update::Set(value) = value {
574                if !f(key, value)? {
575                    return Ok(());
576                }
577            }
578            update = updates.next();
579        }
580        Ok(())
581    }
582
583    /// Applies the function f over all index/value pairs.
584    /// ```rust
585    /// # tokio_test::block_on(async {
586    /// # use linera_views::context::MemoryContext;
587    /// # use linera_views::key_value_store_view::KeyValueStoreView;
588    /// # use linera_views::views::View;
589    /// # let context = MemoryContext::new_for_testing(());
590    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
591    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
592    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
593    /// let mut part_keys = Vec::new();
594    /// view.for_each_index_while(|key| {
595    ///     part_keys.push(key.to_vec());
596    ///     Ok(part_keys.len() < 1)
597    /// })
598    /// .await
599    /// .unwrap();
600    /// assert_eq!(part_keys, vec![vec![0, 1]]);
601    /// # })
602    /// ```
603    pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
604    where
605        F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
606    {
607        self.for_each_index_value_while(|key, value| {
608            f(key, value)?;
609            Ok(true)
610        })
611        .await
612    }
613
614    /// Returns the list of indices in lexicographic order.
615    /// ```rust
616    /// # tokio_test::block_on(async {
617    /// # use linera_views::context::MemoryContext;
618    /// # use linera_views::key_value_store_view::KeyValueStoreView;
619    /// # use linera_views::views::View;
620    /// # let context = MemoryContext::new_for_testing(());
621    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
622    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
623    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
624    /// let indices = view.indices().await.unwrap();
625    /// assert_eq!(indices, vec![vec![0, 1], vec![0, 2]]);
626    /// # })
627    /// ```
628    pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
629        let mut indices = Vec::new();
630        self.for_each_index(|index| {
631            indices.push(index.to_vec());
632            Ok(())
633        })
634        .await?;
635        Ok(indices)
636    }
637
638    /// Returns the list of indices and values in lexicographic order.
639    /// ```rust
640    /// # tokio_test::block_on(async {
641    /// # use linera_views::context::MemoryContext;
642    /// # use linera_views::key_value_store_view::KeyValueStoreView;
643    /// # use linera_views::views::View;
644    /// # let context = MemoryContext::new_for_testing(());
645    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
646    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
647    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
648    /// let key_values = view.indices().await.unwrap();
649    /// assert_eq!(key_values, vec![vec![0, 1], vec![0, 2]]);
650    /// # })
651    /// ```
652    pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
653        let mut index_values = Vec::new();
654        self.for_each_index_value(|index, value| {
655            index_values.push((index.to_vec(), value.to_vec()));
656            Ok(())
657        })
658        .await?;
659        Ok(index_values)
660    }
661
662    /// Returns the number of entries.
663    /// ```rust
664    /// # tokio_test::block_on(async {
665    /// # use linera_views::context::MemoryContext;
666    /// # use linera_views::key_value_store_view::KeyValueStoreView;
667    /// # use linera_views::views::View;
668    /// # let context = MemoryContext::new_for_testing(());
669    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
670    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
671    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
672    /// let count = view.count().await.unwrap();
673    /// assert_eq!(count, 2);
674    /// # })
675    /// ```
676    pub async fn count(&self) -> Result<usize, ViewError> {
677        let mut count = 0;
678        self.for_each_index(|_index| {
679            count += 1;
680            Ok(())
681        })
682        .await?;
683        Ok(count)
684    }
685
686    /// Obtains the value at the given index, if any.
687    /// ```rust
688    /// # tokio_test::block_on(async {
689    /// # use linera_views::context::MemoryContext;
690    /// # use linera_views::key_value_store_view::KeyValueStoreView;
691    /// # use linera_views::views::View;
692    /// # let context = MemoryContext::new_for_testing(());
693    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
694    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
695    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![42]));
696    /// assert_eq!(view.get(&[0, 2]).await.unwrap(), None);
697    /// # })
698    /// ```
699    pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
700        #[cfg(with_metrics)]
701        let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
702        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
703        if let Some(update) = self.updates.get(index) {
704            let value = match update {
705                Update::Removed => None,
706                Update::Set(value) => Some(value.clone()),
707            };
708            return Ok(value);
709        }
710        if self.deletion_set.contains_prefix_of(index) {
711            return Ok(None);
712        }
713        let key = self
714            .context
715            .base_key()
716            .base_tag_index(KeyTag::Index as u8, index);
717        Ok(self.context.store().read_value_bytes(&key).await?)
718    }
719
720    /// Tests whether the store contains a specific index.
721    /// ```rust
722    /// # tokio_test::block_on(async {
723    /// # use linera_views::context::MemoryContext;
724    /// # use linera_views::key_value_store_view::KeyValueStoreView;
725    /// # use linera_views::views::View;
726    /// # let context = MemoryContext::new_for_testing(());
727    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
728    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
729    /// assert!(view.contains_key(&[0, 1]).await.unwrap());
730    /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
731    /// # })
732    /// ```
733    pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
734        #[cfg(with_metrics)]
735        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
736        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
737        if let Some(update) = self.updates.get(index) {
738            let test = match update {
739                Update::Removed => false,
740                Update::Set(_value) => true,
741            };
742            return Ok(test);
743        }
744        if self.deletion_set.contains_prefix_of(index) {
745            return Ok(false);
746        }
747        let key = self
748            .context
749            .base_key()
750            .base_tag_index(KeyTag::Index as u8, index);
751        Ok(self.context.store().contains_key(&key).await?)
752    }
753
754    /// Tests whether the view contains a range of indices
755    /// ```rust
756    /// # tokio_test::block_on(async {
757    /// # use linera_views::context::MemoryContext;
758    /// # use linera_views::key_value_store_view::KeyValueStoreView;
759    /// # use linera_views::views::View;
760    /// # let context = MemoryContext::new_for_testing(());
761    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
762    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
763    /// let keys = vec![vec![0, 1], vec![0, 2]];
764    /// let results = view.contains_keys(keys).await.unwrap();
765    /// assert_eq!(results, vec![true, false]);
766    /// # })
767    /// ```
768    pub async fn contains_keys(&self, indices: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewError> {
769        #[cfg(with_metrics)]
770        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
771        let mut results = Vec::with_capacity(indices.len());
772        let mut missed_indices = Vec::new();
773        let mut vector_query = Vec::new();
774        for (i, index) in indices.into_iter().enumerate() {
775            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
776            if let Some(update) = self.updates.get(&index) {
777                let value = match update {
778                    Update::Removed => false,
779                    Update::Set(_) => true,
780                };
781                results.push(value);
782            } else {
783                results.push(false);
784                if !self.deletion_set.contains_prefix_of(&index) {
785                    missed_indices.push(i);
786                    let key = self
787                        .context
788                        .base_key()
789                        .base_tag_index(KeyTag::Index as u8, &index);
790                    vector_query.push(key);
791                }
792            }
793        }
794        let values = self.context.store().contains_keys(vector_query).await?;
795        for (i, value) in missed_indices.into_iter().zip(values) {
796            results[i] = value;
797        }
798        Ok(results)
799    }
800
801    /// Obtains the values of a range of indices
802    /// ```rust
803    /// # tokio_test::block_on(async {
804    /// # use linera_views::context::MemoryContext;
805    /// # use linera_views::key_value_store_view::KeyValueStoreView;
806    /// # use linera_views::views::View;
807    /// # let context = MemoryContext::new_for_testing(());
808    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
809    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
810    /// assert_eq!(
811    ///     view.multi_get(vec![vec![0, 1], vec![0, 2]]).await.unwrap(),
812    ///     vec![Some(vec![42]), None]
813    /// );
814    /// # })
815    /// ```
816    pub async fn multi_get(
817        &self,
818        indices: Vec<Vec<u8>>,
819    ) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
820        #[cfg(with_metrics)]
821        let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
822        let mut result = Vec::with_capacity(indices.len());
823        let mut missed_indices = Vec::new();
824        let mut vector_query = Vec::new();
825        for (i, index) in indices.into_iter().enumerate() {
826            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
827            if let Some(update) = self.updates.get(&index) {
828                let value = match update {
829                    Update::Removed => None,
830                    Update::Set(value) => Some(value.clone()),
831                };
832                result.push(value);
833            } else {
834                result.push(None);
835                if !self.deletion_set.contains_prefix_of(&index) {
836                    missed_indices.push(i);
837                    let key = self
838                        .context
839                        .base_key()
840                        .base_tag_index(KeyTag::Index as u8, &index);
841                    vector_query.push(key);
842                }
843            }
844        }
845        let values = self
846            .context
847            .store()
848            .read_multi_values_bytes(vector_query)
849            .await?;
850        for (i, value) in missed_indices.into_iter().zip(values) {
851            result[i] = value;
852        }
853        Ok(result)
854    }
855
856    /// Applies the given batch of `crate::common::WriteOperation`.
857    /// ```rust
858    /// # tokio_test::block_on(async {
859    /// # use linera_views::context::MemoryContext;
860    /// # use linera_views::key_value_store_view::KeyValueStoreView;
861    /// # use linera_views::batch::Batch;
862    /// # use linera_views::views::View;
863    /// # let context = MemoryContext::new_for_testing(());
864    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
865    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
866    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
867    /// let mut batch = Batch::new();
868    /// batch.delete_key_prefix(vec![0]);
869    /// view.write_batch(batch).await.unwrap();
870    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
871    /// assert_eq!(key_values, vec![]);
872    /// # })
873    /// ```
874    pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
875        #[cfg(with_metrics)]
876        let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
877        *self.hash.get_mut().unwrap() = None;
878        let max_key_size = self.max_key_size();
879        for operation in batch.operations {
880            match operation {
881                WriteOperation::Delete { key } => {
882                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
883                    if let Some(value) = self.sizes.get(&key).await? {
884                        let entry_size = SizeData {
885                            key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
886                            value,
887                        };
888                        self.total_size.sub_assign(entry_size);
889                    }
890                    self.sizes.remove(key.clone());
891                    if self.deletion_set.contains_prefix_of(&key) {
892                        // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
893                        self.updates.remove(&key);
894                    } else {
895                        self.updates.insert(key, Update::Removed);
896                    }
897                }
898                WriteOperation::Put { key, value } => {
899                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
900                    let entry_size = SizeData {
901                        key: key.len() as u32,
902                        value: value.len() as u32,
903                    };
904                    self.total_size.add_assign(entry_size)?;
905                    if let Some(value) = self.sizes.get(&key).await? {
906                        let entry_size = SizeData {
907                            key: key.len() as u32,
908                            value,
909                        };
910                        self.total_size.sub_assign(entry_size);
911                    }
912                    self.sizes.insert(key.clone(), entry_size.value);
913                    self.updates.insert(key, Update::Set(value));
914                }
915                WriteOperation::DeletePrefix { key_prefix } => {
916                    ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
917                    let key_list = self
918                        .updates
919                        .range(get_interval(key_prefix.clone()))
920                        .map(|x| x.0.to_vec())
921                        .collect::<Vec<_>>();
922                    for key in key_list {
923                        self.updates.remove(&key);
924                    }
925                    let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
926                    for (key, value) in key_values {
927                        let entry_size = SizeData {
928                            key: key.len() as u32,
929                            value,
930                        };
931                        self.total_size.sub_assign(entry_size);
932                        self.sizes.remove(key);
933                    }
934                    self.sizes.remove_by_prefix(key_prefix.clone());
935                    self.deletion_set.insert_key_prefix(key_prefix);
936                }
937            }
938        }
939        Ok(())
940    }
941
942    /// Sets or inserts a value.
943    /// ```rust
944    /// # tokio_test::block_on(async {
945    /// # use linera_views::context::MemoryContext;
946    /// # use linera_views::key_value_store_view::KeyValueStoreView;
947    /// # use linera_views::views::View;
948    /// # let context = MemoryContext::new_for_testing(());
949    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
950    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
951    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![34]));
952    /// # })
953    /// ```
954    pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
955        let mut batch = Batch::new();
956        batch.put_key_value_bytes(index, value);
957        self.write_batch(batch).await
958    }
959
960    /// Removes a value. If absent then the action has no effect.
961    /// ```rust
962    /// # tokio_test::block_on(async {
963    /// # use linera_views::context::MemoryContext;
964    /// # use linera_views::key_value_store_view::KeyValueStoreView;
965    /// # use linera_views::views::View;
966    /// # let context = MemoryContext::new_for_testing(());
967    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
968    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
969    /// view.remove(vec![0, 1]).await.unwrap();
970    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
971    /// # })
972    /// ```
973    pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
974        let mut batch = Batch::new();
975        batch.delete_key(index);
976        self.write_batch(batch).await
977    }
978
979    /// Deletes a key prefix.
980    /// ```rust
981    /// # tokio_test::block_on(async {
982    /// # use linera_views::context::MemoryContext;
983    /// # use linera_views::key_value_store_view::KeyValueStoreView;
984    /// # use linera_views::views::View;
985    /// # let context = MemoryContext::new_for_testing(());
986    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
987    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
988    /// view.remove_by_prefix(vec![0]).await.unwrap();
989    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
990    /// # })
991    /// ```
992    pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
993        let mut batch = Batch::new();
994        batch.delete_key_prefix(key_prefix);
995        self.write_batch(batch).await
996    }
997
998    /// Iterates over all the keys matching the given prefix. The prefix is not included in the returned keys.
999    /// ```rust
1000    /// # tokio_test::block_on(async {
1001    /// # use linera_views::context::MemoryContext;
1002    /// # use linera_views::key_value_store_view::KeyValueStoreView;
1003    /// # use linera_views::views::View;
1004    /// # let context = MemoryContext::new_for_testing(());
1005    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1006    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1007    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1008    /// let keys = view.find_keys_by_prefix(&[0]).await.unwrap();
1009    /// assert_eq!(keys, vec![vec![1]]);
1010    /// # })
1011    /// ```
1012    pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
1013        #[cfg(with_metrics)]
1014        let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
1015        ensure!(
1016            key_prefix.len() <= self.max_key_size(),
1017            ViewError::KeyTooLong
1018        );
1019        let len = key_prefix.len();
1020        let key_prefix_full = self
1021            .context
1022            .base_key()
1023            .base_tag_index(KeyTag::Index as u8, key_prefix);
1024        let mut keys = Vec::new();
1025        let key_prefix_upper = get_upper_bound(key_prefix);
1026        let mut updates = self
1027            .updates
1028            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1029        let mut update = updates.next();
1030        if !self.deletion_set.delete_storage_first {
1031            let mut suffix_closed_set =
1032                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1033            for key in self
1034                .context
1035                .store()
1036                .find_keys_by_prefix(&key_prefix_full)
1037                .await?
1038            {
1039                loop {
1040                    match update {
1041                        Some((update_key, update_value))
1042                            if &update_key[len..] <= key.as_slice() =>
1043                        {
1044                            if let Update::Set(_) = update_value {
1045                                keys.push(update_key[len..].to_vec());
1046                            }
1047                            update = updates.next();
1048                            if update_key[len..] == key[..] {
1049                                break;
1050                            }
1051                        }
1052                        _ => {
1053                            let mut key_with_prefix = key_prefix.to_vec();
1054                            key_with_prefix.extend_from_slice(&key);
1055                            if !suffix_closed_set.find_key(&key_with_prefix) {
1056                                keys.push(key);
1057                            }
1058                            break;
1059                        }
1060                    }
1061                }
1062            }
1063        }
1064        while let Some((update_key, update_value)) = update {
1065            if let Update::Set(_) = update_value {
1066                let update_key = update_key[len..].to_vec();
1067                keys.push(update_key);
1068            }
1069            update = updates.next();
1070        }
1071        Ok(keys)
1072    }
1073
1074    /// Iterates over all the key-value pairs, for keys matching the given prefix. The
1075    /// prefix is not included in the returned keys.
1076    /// ```rust
1077    /// # tokio_test::block_on(async {
1078    /// # use linera_views::context::MemoryContext;
1079    /// # use linera_views::key_value_store_view::KeyValueStoreView;
1080    /// # use linera_views::views::View;
1081    /// # let context = MemoryContext::new_for_testing(());
1082    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1083    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1084    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1085    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
1086    /// assert_eq!(key_values, vec![(vec![1], vec![34])]);
1087    /// # })
1088    /// ```
1089    pub async fn find_key_values_by_prefix(
1090        &self,
1091        key_prefix: &[u8],
1092    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1093        #[cfg(with_metrics)]
1094        let _latency =
1095            metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1096        ensure!(
1097            key_prefix.len() <= self.max_key_size(),
1098            ViewError::KeyTooLong
1099        );
1100        let len = key_prefix.len();
1101        let key_prefix_full = self
1102            .context
1103            .base_key()
1104            .base_tag_index(KeyTag::Index as u8, key_prefix);
1105        let mut key_values = Vec::new();
1106        let key_prefix_upper = get_upper_bound(key_prefix);
1107        let mut updates = self
1108            .updates
1109            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1110        let mut update = updates.next();
1111        if !self.deletion_set.delete_storage_first {
1112            let mut suffix_closed_set =
1113                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1114            for entry in self
1115                .context
1116                .store()
1117                .find_key_values_by_prefix(&key_prefix_full)
1118                .await?
1119            {
1120                let (key, value) = entry;
1121                loop {
1122                    match update {
1123                        Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1124                            if let Update::Set(update_value) = update_value {
1125                                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1126                                key_values.push(key_value);
1127                            }
1128                            update = updates.next();
1129                            if update_key[len..] == key[..] {
1130                                break;
1131                            }
1132                        }
1133                        _ => {
1134                            let mut key_with_prefix = key_prefix.to_vec();
1135                            key_with_prefix.extend_from_slice(&key);
1136                            if !suffix_closed_set.find_key(&key_with_prefix) {
1137                                key_values.push((key, value));
1138                            }
1139                            break;
1140                        }
1141                    }
1142                }
1143            }
1144        }
1145        while let Some((update_key, update_value)) = update {
1146            if let Update::Set(update_value) = update_value {
1147                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1148                key_values.push(key_value);
1149            }
1150            update = updates.next();
1151        }
1152        Ok(key_values)
1153    }
1154
1155    async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1156        #[cfg(with_metrics)]
1157        let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1158        let mut hasher = sha3::Sha3_256::default();
1159        let mut count = 0u32;
1160        self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1161            count += 1;
1162            hasher.update_with_bytes(index)?;
1163            hasher.update_with_bytes(value)?;
1164            Ok(())
1165        })
1166        .await?;
1167        hasher.update_with_bcs_bytes(&count)?;
1168        Ok(hasher.finalize())
1169    }
1170}
1171
1172impl<C: Context> HashableView for KeyValueStoreView<C> {
1173    type Hasher = sha3::Sha3_256;
1174
1175    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1176        let hash = *self.hash.get_mut().unwrap();
1177        match hash {
1178            Some(hash) => Ok(hash),
1179            None => {
1180                let new_hash = self.compute_hash().await?;
1181                let hash = self.hash.get_mut().unwrap();
1182                *hash = Some(new_hash);
1183                Ok(new_hash)
1184            }
1185        }
1186    }
1187
1188    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1189        let hash = *self.hash.lock().unwrap();
1190        match hash {
1191            Some(hash) => Ok(hash),
1192            None => {
1193                let new_hash = self.compute_hash().await?;
1194                let mut hash = self.hash.lock().unwrap();
1195                *hash = Some(new_hash);
1196                Ok(new_hash)
1197            }
1198        }
1199    }
1200}
1201
1202/// A virtual DB client using a `KeyValueStoreView` as a backend (testing only).
1203#[cfg(with_testing)]
1204#[derive(Debug, Clone)]
1205pub struct ViewContainer<C> {
1206    view: Arc<RwLock<KeyValueStoreView<C>>>,
1207}
1208
1209#[cfg(with_testing)]
1210impl<C> WithError for ViewContainer<C> {
1211    type Error = ViewContainerError;
1212}
1213
1214#[cfg(with_testing)]
1215/// The error type for [`ViewContainer`] operations.
1216#[derive(Error, Debug)]
1217pub enum ViewContainerError {
1218    /// View error.
1219    #[error(transparent)]
1220    ViewError(#[from] ViewError),
1221
1222    /// BCS serialization error.
1223    #[error(transparent)]
1224    BcsError(#[from] bcs::Error),
1225}
1226
1227#[cfg(with_testing)]
1228impl KeyValueStoreError for ViewContainerError {
1229    const BACKEND: &'static str = "view_container";
1230}
1231
1232#[cfg(with_testing)]
1233impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1234    const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1235
1236    fn max_stream_queries(&self) -> usize {
1237        1
1238    }
1239
1240    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1241        let view = self.view.read().await;
1242        Ok(view.get(key).await?)
1243    }
1244
1245    async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1246        let view = self.view.read().await;
1247        Ok(view.contains_key(key).await?)
1248    }
1249
1250    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewContainerError> {
1251        let view = self.view.read().await;
1252        Ok(view.contains_keys(keys).await?)
1253    }
1254
1255    async fn read_multi_values_bytes(
1256        &self,
1257        keys: Vec<Vec<u8>>,
1258    ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1259        let view = self.view.read().await;
1260        Ok(view.multi_get(keys).await?)
1261    }
1262
1263    async fn find_keys_by_prefix(
1264        &self,
1265        key_prefix: &[u8],
1266    ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1267        let view = self.view.read().await;
1268        Ok(view.find_keys_by_prefix(key_prefix).await?)
1269    }
1270
1271    async fn find_key_values_by_prefix(
1272        &self,
1273        key_prefix: &[u8],
1274    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1275        let view = self.view.read().await;
1276        Ok(view.find_key_values_by_prefix(key_prefix).await?)
1277    }
1278}
1279
1280#[cfg(with_testing)]
1281impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1282    const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1283
1284    async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1285        let mut view = self.view.write().await;
1286        view.write_batch(batch).await?;
1287        let mut batch = Batch::new();
1288        view.flush(&mut batch)?;
1289        view.context()
1290            .store()
1291            .write_batch(batch)
1292            .await
1293            .map_err(ViewError::from)?;
1294        Ok(())
1295    }
1296
1297    async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1298        Ok(())
1299    }
1300}
1301
1302#[cfg(with_testing)]
1303impl<C: Context> ViewContainer<C> {
1304    /// Creates a [`ViewContainer`].
1305    pub async fn new(context: C) -> Result<Self, ViewError> {
1306        let view = KeyValueStoreView::load(context).await?;
1307        let view = Arc::new(RwLock::new(view));
1308        Ok(Self { view })
1309    }
1310}