linera_views/views/
key_value_store_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! We implement two types:
5//! 1) The first type `KeyValueStoreView` implements View and the function of `KeyValueStore`.
6//!
7//! 2) The second type `ViewContainer` encapsulates `KeyValueStoreView` and provides the following functionalities:
8//!    * The `Clone` trait
9//!    * a `write_batch` that takes a `&self` instead of a `&mut self`
10//!    * a `write_batch` that writes in the context instead of writing of the view.
11//!
12//! Currently, that second type is only used for tests.
13//!
14//! Key tags to create the sub-keys of a `KeyValueStoreView` on top of the base key.
15
16use std::{collections::BTreeMap, fmt::Debug, mem, ops::Bound::Included, sync::Mutex};
17
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{data_types::ArithmeticError, ensure};
21use serde::{Deserialize, Serialize};
22
23use crate::{
24    batch::{Batch, WriteOperation},
25    common::{
26        from_bytes_option, from_bytes_option_or_default, get_key_range_for_prefix, get_upper_bound,
27        DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
28    },
29    context::Context,
30    hashable_wrapper::WrappedHashableContainerView,
31    historical_hash_wrapper::HistoricallyHashableView,
32    map_view::ByteMapView,
33    store::ReadableKeyValueStore,
34    views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
35};
36
37#[cfg(with_metrics)]
38mod metrics {
39    use std::sync::LazyLock;
40
41    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
42    use prometheus::HistogramVec;
43
44    /// The latency of hash computation
45    pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
46        register_histogram_vec(
47            "key_value_store_view_hash_latency",
48            "KeyValueStoreView hash latency",
49            &[],
50            exponential_bucket_latencies(5.0),
51        )
52    });
53
54    /// The latency of get operation
55    pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
56        register_histogram_vec(
57            "key_value_store_view_get_latency",
58            "KeyValueStoreView get latency",
59            &[],
60            exponential_bucket_latencies(5.0),
61        )
62    });
63
64    /// The latency of multi get
65    pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
66        LazyLock::new(|| {
67            register_histogram_vec(
68                "key_value_store_view_multi_get_latency",
69                "KeyValueStoreView multi get latency",
70                &[],
71                exponential_bucket_latencies(5.0),
72            )
73        });
74
75    /// The latency of contains key
76    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
77        LazyLock::new(|| {
78            register_histogram_vec(
79                "key_value_store_view_contains_key_latency",
80                "KeyValueStoreView contains key latency",
81                &[],
82                exponential_bucket_latencies(5.0),
83            )
84        });
85
86    /// The latency of contains keys
87    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
88        LazyLock::new(|| {
89            register_histogram_vec(
90                "key_value_store_view_contains_keys_latency",
91                "KeyValueStoreView contains keys latency",
92                &[],
93                exponential_bucket_latencies(5.0),
94            )
95        });
96
97    /// The latency of find keys by prefix operation
98    pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
99        LazyLock::new(|| {
100            register_histogram_vec(
101                "key_value_store_view_find_keys_by_prefix_latency",
102                "KeyValueStoreView find keys by prefix latency",
103                &[],
104                exponential_bucket_latencies(5.0),
105            )
106        });
107
108    /// The latency of find key values by prefix operation
109    pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
110        LazyLock::new(|| {
111            register_histogram_vec(
112                "key_value_store_view_find_key_values_by_prefix_latency",
113                "KeyValueStoreView find key values by prefix latency",
114                &[],
115                exponential_bucket_latencies(5.0),
116            )
117        });
118
119    /// The latency of write batch operation
120    pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
121        LazyLock::new(|| {
122            register_histogram_vec(
123                "key_value_store_view_write_batch_latency",
124                "KeyValueStoreView write batch latency",
125                &[],
126                exponential_bucket_latencies(5.0),
127            )
128        });
129}
130
131#[cfg(with_testing)]
132use {
133    crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
134    async_lock::RwLock,
135    std::sync::Arc,
136    thiserror::Error,
137};
138
139#[repr(u8)]
140enum KeyTag {
141    /// Prefix for the indices of the view.
142    Index = MIN_VIEW_TAG,
143    /// The total stored size
144    TotalSize,
145    /// The prefix where the sizes are being stored
146    Sizes,
147    /// Prefix for the hash.
148    Hash,
149}
150
151/// A pair containing the key and value size.
152#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
153pub struct SizeData {
154    /// The size of the key
155    pub key: u32,
156    /// The size of the value
157    pub value: u32,
158}
159
160impl SizeData {
161    /// Sums both terms
162    pub fn sum(&mut self) -> u32 {
163        self.key + self.value
164    }
165
166    /// Adds a size to `self`
167    pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
168        self.key = self
169            .key
170            .checked_add(size.key)
171            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
172        self.value = self
173            .value
174            .checked_add(size.value)
175            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
176        Ok(())
177    }
178
179    /// Subtracts a size from `self`
180    pub fn sub_assign(&mut self, size: SizeData) {
181        self.key -= size.key;
182        self.value -= size.value;
183    }
184}
185
186/// A view that represents the functions of `KeyValueStore`.
187///
188/// Comment on the data set:
189/// In order to work, the view needs to store the updates and deleted prefixes.
190/// The updates and deleted prefixes have to be coherent. This means:
191/// * If an index is deleted by one in deleted prefixes then it should not be present
192///   in the updates at all.
193/// * [`DeletePrefix::key_prefix`][entry1] should not dominate anyone. That is if we have `[0,2]`
194///   then we should not have `[0,2,3]` since it would be dominated by the preceding.
195///
196/// With that we have:
197/// * in order to test if an `index` is deleted by a prefix we compute the highest deleted prefix `dp`
198///   such that `dp <= index`.
199///   If `dp` is indeed a prefix then we conclude that `index` is deleted, otherwise not.
200///   The no domination is essential here.
201///
202/// [entry1]: crate::batch::WriteOperation::DeletePrefix
203#[derive(Debug)]
204pub struct KeyValueStoreView<C> {
205    context: C,
206    deletion_set: DeletionSet,
207    updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
208    stored_total_size: SizeData,
209    total_size: SizeData,
210    sizes: ByteMapView<C, u32>,
211    stored_hash: Option<HasherOutput>,
212    hash: Mutex<Option<HasherOutput>>,
213}
214
215impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
216    type Target = KeyValueStoreView<C2>;
217
218    async fn with_context(
219        &mut self,
220        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
221    ) -> Self::Target {
222        let hash = *self.hash.lock().unwrap();
223        KeyValueStoreView {
224            context: ctx.clone()(self.context()),
225            deletion_set: self.deletion_set.clone(),
226            updates: self.updates.clone(),
227            stored_total_size: self.stored_total_size,
228            total_size: self.total_size,
229            sizes: self.sizes.with_context(ctx.clone()).await,
230            stored_hash: self.stored_hash,
231            hash: Mutex::new(hash),
232        }
233    }
234}
235
236impl<C: Context> View for KeyValueStoreView<C> {
237    const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
238
239    type Context = C;
240
241    fn context(&self) -> &C {
242        &self.context
243    }
244
245    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
246        let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
247        let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
248        let mut v = vec![key_hash, key_total_size];
249        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
250        let context_sizes = context.clone_with_base_key(base_key);
251        v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
252        Ok(v)
253    }
254
255    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
256        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
257        let total_size =
258            from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
259        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
260        let context_sizes = context.clone_with_base_key(base_key);
261        let sizes = ByteMapView::post_load(
262            context_sizes,
263            values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
264        )?;
265        Ok(Self {
266            context,
267            deletion_set: DeletionSet::new(),
268            updates: BTreeMap::new(),
269            stored_total_size: total_size,
270            total_size,
271            sizes,
272            stored_hash: hash,
273            hash: Mutex::new(hash),
274        })
275    }
276
277    fn rollback(&mut self) {
278        self.deletion_set.rollback();
279        self.updates.clear();
280        self.total_size = self.stored_total_size;
281        self.sizes.rollback();
282        *self.hash.get_mut().unwrap() = self.stored_hash;
283    }
284
285    async fn has_pending_changes(&self) -> bool {
286        if self.deletion_set.has_pending_changes() {
287            return true;
288        }
289        if !self.updates.is_empty() {
290            return true;
291        }
292        if self.stored_total_size != self.total_size {
293            return true;
294        }
295        if self.sizes.has_pending_changes().await {
296            return true;
297        }
298        let hash = self.hash.lock().unwrap();
299        self.stored_hash != *hash
300    }
301
302    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
303        let mut delete_view = false;
304        if self.deletion_set.delete_storage_first {
305            delete_view = true;
306            self.stored_total_size = SizeData::default();
307            batch.delete_key_prefix(self.context.base_key().bytes.clone());
308            for (index, update) in mem::take(&mut self.updates) {
309                if let Update::Set(value) = update {
310                    let key = self
311                        .context
312                        .base_key()
313                        .base_tag_index(KeyTag::Index as u8, &index);
314                    batch.put_key_value_bytes(key, value);
315                    delete_view = false;
316                }
317            }
318            self.stored_hash = None
319        } else {
320            for index in mem::take(&mut self.deletion_set.deleted_prefixes) {
321                let key = self
322                    .context
323                    .base_key()
324                    .base_tag_index(KeyTag::Index as u8, &index);
325                batch.delete_key_prefix(key);
326            }
327            for (index, update) in mem::take(&mut self.updates) {
328                let key = self
329                    .context
330                    .base_key()
331                    .base_tag_index(KeyTag::Index as u8, &index);
332                match update {
333                    Update::Removed => batch.delete_key(key),
334                    Update::Set(value) => batch.put_key_value_bytes(key, value),
335                }
336            }
337        }
338        self.sizes.flush(batch)?;
339        let hash = *self.hash.get_mut().unwrap();
340        if self.stored_hash != hash {
341            let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
342            match hash {
343                None => batch.delete_key(key),
344                Some(hash) => batch.put_key_value(key, &hash)?,
345            }
346            self.stored_hash = hash;
347        }
348        if self.stored_total_size != self.total_size {
349            let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
350            batch.put_key_value(key, &self.total_size)?;
351            self.stored_total_size = self.total_size;
352        }
353        self.deletion_set.delete_storage_first = false;
354        Ok(delete_view)
355    }
356
357    fn clear(&mut self) {
358        self.deletion_set.clear();
359        self.updates.clear();
360        self.total_size = SizeData::default();
361        self.sizes.clear();
362        *self.hash.get_mut().unwrap() = None;
363    }
364}
365
366impl<C: Context> ClonableView for KeyValueStoreView<C> {
367    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
368        Ok(KeyValueStoreView {
369            context: self.context.clone(),
370            deletion_set: self.deletion_set.clone(),
371            updates: self.updates.clone(),
372            stored_total_size: self.stored_total_size,
373            total_size: self.total_size,
374            sizes: self.sizes.clone_unchecked()?,
375            stored_hash: self.stored_hash,
376            hash: Mutex::new(*self.hash.get_mut().unwrap()),
377        })
378    }
379}
380
381impl<C: Context> KeyValueStoreView<C> {
382    fn max_key_size(&self) -> usize {
383        let prefix_len = self.context.base_key().bytes.len();
384        <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
385    }
386
387    /// Getting the total sizes that will be used for keys and values when stored
388    /// ```rust
389    /// # tokio_test::block_on(async {
390    /// # use linera_views::context::MemoryContext;
391    /// # use linera_views::key_value_store_view::{KeyValueStoreView, SizeData};
392    /// # use linera_views::views::View;
393    /// # let context = MemoryContext::new_for_testing(());
394    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
395    /// let total_size = view.total_size();
396    /// assert_eq!(total_size, SizeData::default());
397    /// # })
398    /// ```
399    pub fn total_size(&self) -> SizeData {
400        self.total_size
401    }
402
403    /// Applies the function f over all indices. If the function f returns
404    /// false, then the loop ends prematurely.
405    /// ```rust
406    /// # tokio_test::block_on(async {
407    /// # use linera_views::context::MemoryContext;
408    /// # use linera_views::key_value_store_view::KeyValueStoreView;
409    /// # use linera_views::views::View;
410    /// # let context = MemoryContext::new_for_testing(());
411    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
412    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
413    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
414    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
415    /// let mut count = 0;
416    /// view.for_each_index_while(|_key| {
417    ///     count += 1;
418    ///     Ok(count < 2)
419    /// })
420    /// .await
421    /// .unwrap();
422    /// assert_eq!(count, 2);
423    /// # })
424    /// ```
425    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
426    where
427        F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
428    {
429        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
430        let mut updates = self.updates.iter();
431        let mut update = updates.next();
432        if !self.deletion_set.delete_storage_first {
433            let mut suffix_closed_set =
434                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
435            for index in self
436                .context
437                .store()
438                .find_keys_by_prefix(&key_prefix)
439                .await?
440            {
441                loop {
442                    match update {
443                        Some((key, value)) if key <= &index => {
444                            if let Update::Set(_) = value {
445                                if !f(key)? {
446                                    return Ok(());
447                                }
448                            }
449                            update = updates.next();
450                            if key == &index {
451                                break;
452                            }
453                        }
454                        _ => {
455                            if !suffix_closed_set.find_key(&index) && !f(&index)? {
456                                return Ok(());
457                            }
458                            break;
459                        }
460                    }
461                }
462            }
463        }
464        while let Some((key, value)) = update {
465            if let Update::Set(_) = value {
466                if !f(key)? {
467                    return Ok(());
468                }
469            }
470            update = updates.next();
471        }
472        Ok(())
473    }
474
475    /// Applies the function f over all indices.
476    /// ```rust
477    /// # tokio_test::block_on(async {
478    /// # use linera_views::context::MemoryContext;
479    /// # use linera_views::key_value_store_view::KeyValueStoreView;
480    /// # use linera_views::views::View;
481    /// # let context = MemoryContext::new_for_testing(());
482    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
483    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
484    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
485    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
486    /// let mut count = 0;
487    /// view.for_each_index(|_key| {
488    ///     count += 1;
489    ///     Ok(())
490    /// })
491    /// .await
492    /// .unwrap();
493    /// assert_eq!(count, 3);
494    /// # })
495    /// ```
496    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
497    where
498        F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
499    {
500        self.for_each_index_while(|key| {
501            f(key)?;
502            Ok(true)
503        })
504        .await
505    }
506
507    /// Applies the function f over all index/value pairs.
508    /// If the function f returns false then the loop ends prematurely.
509    /// ```rust
510    /// # tokio_test::block_on(async {
511    /// # use linera_views::context::MemoryContext;
512    /// # use linera_views::key_value_store_view::KeyValueStoreView;
513    /// # use linera_views::views::View;
514    /// # let context = MemoryContext::new_for_testing(());
515    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
516    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
517    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
518    /// let mut values = Vec::new();
519    /// view.for_each_index_value_while(|_key, value| {
520    ///     values.push(value.to_vec());
521    ///     Ok(values.len() < 1)
522    /// })
523    /// .await
524    /// .unwrap();
525    /// assert_eq!(values, vec![vec![0]]);
526    /// # })
527    /// ```
528    pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
529    where
530        F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
531    {
532        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
533        let mut updates = self.updates.iter();
534        let mut update = updates.next();
535        if !self.deletion_set.delete_storage_first {
536            let mut suffix_closed_set =
537                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
538            for entry in self
539                .context
540                .store()
541                .find_key_values_by_prefix(&key_prefix)
542                .await?
543            {
544                let (index, index_val) = entry;
545                loop {
546                    match update {
547                        Some((key, value)) if key <= &index => {
548                            if let Update::Set(value) = value {
549                                if !f(key, value)? {
550                                    return Ok(());
551                                }
552                            }
553                            update = updates.next();
554                            if key == &index {
555                                break;
556                            }
557                        }
558                        _ => {
559                            if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
560                                return Ok(());
561                            }
562                            break;
563                        }
564                    }
565                }
566            }
567        }
568        while let Some((key, value)) = update {
569            if let Update::Set(value) = value {
570                if !f(key, value)? {
571                    return Ok(());
572                }
573            }
574            update = updates.next();
575        }
576        Ok(())
577    }
578
579    /// Applies the function f over all index/value pairs.
580    /// ```rust
581    /// # tokio_test::block_on(async {
582    /// # use linera_views::context::MemoryContext;
583    /// # use linera_views::key_value_store_view::KeyValueStoreView;
584    /// # use linera_views::views::View;
585    /// # let context = MemoryContext::new_for_testing(());
586    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
587    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
588    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
589    /// let mut part_keys = Vec::new();
590    /// view.for_each_index_while(|key| {
591    ///     part_keys.push(key.to_vec());
592    ///     Ok(part_keys.len() < 1)
593    /// })
594    /// .await
595    /// .unwrap();
596    /// assert_eq!(part_keys, vec![vec![0, 1]]);
597    /// # })
598    /// ```
599    pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
600    where
601        F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
602    {
603        self.for_each_index_value_while(|key, value| {
604            f(key, value)?;
605            Ok(true)
606        })
607        .await
608    }
609
610    /// Returns the list of indices in lexicographic order.
611    /// ```rust
612    /// # tokio_test::block_on(async {
613    /// # use linera_views::context::MemoryContext;
614    /// # use linera_views::key_value_store_view::KeyValueStoreView;
615    /// # use linera_views::views::View;
616    /// # let context = MemoryContext::new_for_testing(());
617    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
618    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
619    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
620    /// let indices = view.indices().await.unwrap();
621    /// assert_eq!(indices, vec![vec![0, 1], vec![0, 2]]);
622    /// # })
623    /// ```
624    pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
625        let mut indices = Vec::new();
626        self.for_each_index(|index| {
627            indices.push(index.to_vec());
628            Ok(())
629        })
630        .await?;
631        Ok(indices)
632    }
633
634    /// Returns the list of indices and values in lexicographic order.
635    /// ```rust
636    /// # tokio_test::block_on(async {
637    /// # use linera_views::context::MemoryContext;
638    /// # use linera_views::key_value_store_view::KeyValueStoreView;
639    /// # use linera_views::views::View;
640    /// # let context = MemoryContext::new_for_testing(());
641    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
642    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
643    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
644    /// let key_values = view.indices().await.unwrap();
645    /// assert_eq!(key_values, vec![vec![0, 1], vec![0, 2]]);
646    /// # })
647    /// ```
648    pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
649        let mut index_values = Vec::new();
650        self.for_each_index_value(|index, value| {
651            index_values.push((index.to_vec(), value.to_vec()));
652            Ok(())
653        })
654        .await?;
655        Ok(index_values)
656    }
657
658    /// Returns the number of entries.
659    /// ```rust
660    /// # tokio_test::block_on(async {
661    /// # use linera_views::context::MemoryContext;
662    /// # use linera_views::key_value_store_view::KeyValueStoreView;
663    /// # use linera_views::views::View;
664    /// # let context = MemoryContext::new_for_testing(());
665    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
666    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
667    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
668    /// let count = view.count().await.unwrap();
669    /// assert_eq!(count, 2);
670    /// # })
671    /// ```
672    pub async fn count(&self) -> Result<usize, ViewError> {
673        let mut count = 0;
674        self.for_each_index(|_index| {
675            count += 1;
676            Ok(())
677        })
678        .await?;
679        Ok(count)
680    }
681
682    /// Obtains the value at the given index, if any.
683    /// ```rust
684    /// # tokio_test::block_on(async {
685    /// # use linera_views::context::MemoryContext;
686    /// # use linera_views::key_value_store_view::KeyValueStoreView;
687    /// # use linera_views::views::View;
688    /// # let context = MemoryContext::new_for_testing(());
689    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
690    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
691    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![42]));
692    /// assert_eq!(view.get(&[0, 2]).await.unwrap(), None);
693    /// # })
694    /// ```
695    pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
696        #[cfg(with_metrics)]
697        let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
698        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
699        if let Some(update) = self.updates.get(index) {
700            let value = match update {
701                Update::Removed => None,
702                Update::Set(value) => Some(value.clone()),
703            };
704            return Ok(value);
705        }
706        if self.deletion_set.contains_prefix_of(index) {
707            return Ok(None);
708        }
709        let key = self
710            .context
711            .base_key()
712            .base_tag_index(KeyTag::Index as u8, index);
713        Ok(self.context.store().read_value_bytes(&key).await?)
714    }
715
716    /// Tests whether the store contains a specific index.
717    /// ```rust
718    /// # tokio_test::block_on(async {
719    /// # use linera_views::context::MemoryContext;
720    /// # use linera_views::key_value_store_view::KeyValueStoreView;
721    /// # use linera_views::views::View;
722    /// # let context = MemoryContext::new_for_testing(());
723    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
724    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
725    /// assert!(view.contains_key(&[0, 1]).await.unwrap());
726    /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
727    /// # })
728    /// ```
729    pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
730        #[cfg(with_metrics)]
731        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
732        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
733        if let Some(update) = self.updates.get(index) {
734            let test = match update {
735                Update::Removed => false,
736                Update::Set(_value) => true,
737            };
738            return Ok(test);
739        }
740        if self.deletion_set.contains_prefix_of(index) {
741            return Ok(false);
742        }
743        let key = self
744            .context
745            .base_key()
746            .base_tag_index(KeyTag::Index as u8, index);
747        Ok(self.context.store().contains_key(&key).await?)
748    }
749
750    /// Tests whether the view contains a range of indices
751    /// ```rust
752    /// # tokio_test::block_on(async {
753    /// # use linera_views::context::MemoryContext;
754    /// # use linera_views::key_value_store_view::KeyValueStoreView;
755    /// # use linera_views::views::View;
756    /// # let context = MemoryContext::new_for_testing(());
757    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
758    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
759    /// let keys = vec![vec![0, 1], vec![0, 2]];
760    /// let results = view.contains_keys(keys).await.unwrap();
761    /// assert_eq!(results, vec![true, false]);
762    /// # })
763    /// ```
764    pub async fn contains_keys(&self, indices: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewError> {
765        #[cfg(with_metrics)]
766        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
767        let mut results = Vec::with_capacity(indices.len());
768        let mut missed_indices = Vec::new();
769        let mut vector_query = Vec::new();
770        for (i, index) in indices.into_iter().enumerate() {
771            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
772            if let Some(update) = self.updates.get(&index) {
773                let value = match update {
774                    Update::Removed => false,
775                    Update::Set(_) => true,
776                };
777                results.push(value);
778            } else {
779                results.push(false);
780                if !self.deletion_set.contains_prefix_of(&index) {
781                    missed_indices.push(i);
782                    let key = self
783                        .context
784                        .base_key()
785                        .base_tag_index(KeyTag::Index as u8, &index);
786                    vector_query.push(key);
787                }
788            }
789        }
790        let values = self.context.store().contains_keys(vector_query).await?;
791        for (i, value) in missed_indices.into_iter().zip(values) {
792            results[i] = value;
793        }
794        Ok(results)
795    }
796
797    /// Obtains the values of a range of indices
798    /// ```rust
799    /// # tokio_test::block_on(async {
800    /// # use linera_views::context::MemoryContext;
801    /// # use linera_views::key_value_store_view::KeyValueStoreView;
802    /// # use linera_views::views::View;
803    /// # let context = MemoryContext::new_for_testing(());
804    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
805    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
806    /// assert_eq!(
807    ///     view.multi_get(vec![vec![0, 1], vec![0, 2]]).await.unwrap(),
808    ///     vec![Some(vec![42]), None]
809    /// );
810    /// # })
811    /// ```
812    pub async fn multi_get(
813        &self,
814        indices: Vec<Vec<u8>>,
815    ) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
816        #[cfg(with_metrics)]
817        let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
818        let mut result = Vec::with_capacity(indices.len());
819        let mut missed_indices = Vec::new();
820        let mut vector_query = Vec::new();
821        for (i, index) in indices.into_iter().enumerate() {
822            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
823            if let Some(update) = self.updates.get(&index) {
824                let value = match update {
825                    Update::Removed => None,
826                    Update::Set(value) => Some(value.clone()),
827                };
828                result.push(value);
829            } else {
830                result.push(None);
831                if !self.deletion_set.contains_prefix_of(&index) {
832                    missed_indices.push(i);
833                    let key = self
834                        .context
835                        .base_key()
836                        .base_tag_index(KeyTag::Index as u8, &index);
837                    vector_query.push(key);
838                }
839            }
840        }
841        let values = self
842            .context
843            .store()
844            .read_multi_values_bytes(vector_query)
845            .await?;
846        for (i, value) in missed_indices.into_iter().zip(values) {
847            result[i] = value;
848        }
849        Ok(result)
850    }
851
852    /// Applies the given batch of `crate::common::WriteOperation`.
853    /// ```rust
854    /// # tokio_test::block_on(async {
855    /// # use linera_views::context::MemoryContext;
856    /// # use linera_views::key_value_store_view::KeyValueStoreView;
857    /// # use linera_views::batch::Batch;
858    /// # use linera_views::views::View;
859    /// # let context = MemoryContext::new_for_testing(());
860    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
861    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
862    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
863    /// let mut batch = Batch::new();
864    /// batch.delete_key_prefix(vec![0]);
865    /// view.write_batch(batch).await.unwrap();
866    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
867    /// assert_eq!(key_values, vec![]);
868    /// # })
869    /// ```
870    pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
871        #[cfg(with_metrics)]
872        let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
873        *self.hash.get_mut().unwrap() = None;
874        let max_key_size = self.max_key_size();
875        for operation in batch.operations {
876            match operation {
877                WriteOperation::Delete { key } => {
878                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
879                    if let Some(value) = self.sizes.get(&key).await? {
880                        let entry_size = SizeData {
881                            key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
882                            value,
883                        };
884                        self.total_size.sub_assign(entry_size);
885                    }
886                    self.sizes.remove(key.clone());
887                    if self.deletion_set.contains_prefix_of(&key) {
888                        // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
889                        self.updates.remove(&key);
890                    } else {
891                        self.updates.insert(key, Update::Removed);
892                    }
893                }
894                WriteOperation::Put { key, value } => {
895                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
896                    let entry_size = SizeData {
897                        key: key.len() as u32,
898                        value: value.len() as u32,
899                    };
900                    self.total_size.add_assign(entry_size)?;
901                    if let Some(value) = self.sizes.get(&key).await? {
902                        let entry_size = SizeData {
903                            key: key.len() as u32,
904                            value,
905                        };
906                        self.total_size.sub_assign(entry_size);
907                    }
908                    self.sizes.insert(key.clone(), entry_size.value);
909                    self.updates.insert(key, Update::Set(value));
910                }
911                WriteOperation::DeletePrefix { key_prefix } => {
912                    ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
913                    let key_list = self
914                        .updates
915                        .range(get_key_range_for_prefix(key_prefix.clone()))
916                        .map(|x| x.0.to_vec())
917                        .collect::<Vec<_>>();
918                    for key in key_list {
919                        self.updates.remove(&key);
920                    }
921                    let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
922                    for (key, value) in key_values {
923                        let entry_size = SizeData {
924                            key: key.len() as u32,
925                            value,
926                        };
927                        self.total_size.sub_assign(entry_size);
928                        self.sizes.remove(key);
929                    }
930                    self.sizes.remove_by_prefix(key_prefix.clone());
931                    self.deletion_set.insert_key_prefix(key_prefix);
932                }
933            }
934        }
935        Ok(())
936    }
937
938    /// Sets or inserts a value.
939    /// ```rust
940    /// # tokio_test::block_on(async {
941    /// # use linera_views::context::MemoryContext;
942    /// # use linera_views::key_value_store_view::KeyValueStoreView;
943    /// # use linera_views::views::View;
944    /// # let context = MemoryContext::new_for_testing(());
945    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
946    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
947    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![34]));
948    /// # })
949    /// ```
950    pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
951        let mut batch = Batch::new();
952        batch.put_key_value_bytes(index, value);
953        self.write_batch(batch).await
954    }
955
956    /// Removes a value. If absent then the action has no effect.
957    /// ```rust
958    /// # tokio_test::block_on(async {
959    /// # use linera_views::context::MemoryContext;
960    /// # use linera_views::key_value_store_view::KeyValueStoreView;
961    /// # use linera_views::views::View;
962    /// # let context = MemoryContext::new_for_testing(());
963    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
964    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
965    /// view.remove(vec![0, 1]).await.unwrap();
966    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
967    /// # })
968    /// ```
969    pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
970        let mut batch = Batch::new();
971        batch.delete_key(index);
972        self.write_batch(batch).await
973    }
974
975    /// Deletes a key prefix.
976    /// ```rust
977    /// # tokio_test::block_on(async {
978    /// # use linera_views::context::MemoryContext;
979    /// # use linera_views::key_value_store_view::KeyValueStoreView;
980    /// # use linera_views::views::View;
981    /// # let context = MemoryContext::new_for_testing(());
982    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
983    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
984    /// view.remove_by_prefix(vec![0]).await.unwrap();
985    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
986    /// # })
987    /// ```
988    pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
989        let mut batch = Batch::new();
990        batch.delete_key_prefix(key_prefix);
991        self.write_batch(batch).await
992    }
993
994    /// Iterates over all the keys matching the given prefix. The prefix is not included in the returned keys.
995    /// ```rust
996    /// # tokio_test::block_on(async {
997    /// # use linera_views::context::MemoryContext;
998    /// # use linera_views::key_value_store_view::KeyValueStoreView;
999    /// # use linera_views::views::View;
1000    /// # let context = MemoryContext::new_for_testing(());
1001    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1002    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1003    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1004    /// let keys = view.find_keys_by_prefix(&[0]).await.unwrap();
1005    /// assert_eq!(keys, vec![vec![1]]);
1006    /// # })
1007    /// ```
1008    pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
1009        #[cfg(with_metrics)]
1010        let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
1011        ensure!(
1012            key_prefix.len() <= self.max_key_size(),
1013            ViewError::KeyTooLong
1014        );
1015        let len = key_prefix.len();
1016        let key_prefix_full = self
1017            .context
1018            .base_key()
1019            .base_tag_index(KeyTag::Index as u8, key_prefix);
1020        let mut keys = Vec::new();
1021        let key_prefix_upper = get_upper_bound(key_prefix);
1022        let mut updates = self
1023            .updates
1024            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1025        let mut update = updates.next();
1026        if !self.deletion_set.delete_storage_first {
1027            let mut suffix_closed_set =
1028                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1029            for key in self
1030                .context
1031                .store()
1032                .find_keys_by_prefix(&key_prefix_full)
1033                .await?
1034            {
1035                loop {
1036                    match update {
1037                        Some((update_key, update_value))
1038                            if &update_key[len..] <= key.as_slice() =>
1039                        {
1040                            if let Update::Set(_) = update_value {
1041                                keys.push(update_key[len..].to_vec());
1042                            }
1043                            update = updates.next();
1044                            if update_key[len..] == key[..] {
1045                                break;
1046                            }
1047                        }
1048                        _ => {
1049                            let mut key_with_prefix = key_prefix.to_vec();
1050                            key_with_prefix.extend_from_slice(&key);
1051                            if !suffix_closed_set.find_key(&key_with_prefix) {
1052                                keys.push(key);
1053                            }
1054                            break;
1055                        }
1056                    }
1057                }
1058            }
1059        }
1060        while let Some((update_key, update_value)) = update {
1061            if let Update::Set(_) = update_value {
1062                let update_key = update_key[len..].to_vec();
1063                keys.push(update_key);
1064            }
1065            update = updates.next();
1066        }
1067        Ok(keys)
1068    }
1069
1070    /// Iterates over all the key-value pairs, for keys matching the given prefix. The
1071    /// prefix is not included in the returned keys.
1072    /// ```rust
1073    /// # tokio_test::block_on(async {
1074    /// # use linera_views::context::MemoryContext;
1075    /// # use linera_views::key_value_store_view::KeyValueStoreView;
1076    /// # use linera_views::views::View;
1077    /// # let context = MemoryContext::new_for_testing(());
1078    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1079    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1080    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1081    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
1082    /// assert_eq!(key_values, vec![(vec![1], vec![34])]);
1083    /// # })
1084    /// ```
1085    pub async fn find_key_values_by_prefix(
1086        &self,
1087        key_prefix: &[u8],
1088    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1089        #[cfg(with_metrics)]
1090        let _latency =
1091            metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1092        ensure!(
1093            key_prefix.len() <= self.max_key_size(),
1094            ViewError::KeyTooLong
1095        );
1096        let len = key_prefix.len();
1097        let key_prefix_full = self
1098            .context
1099            .base_key()
1100            .base_tag_index(KeyTag::Index as u8, key_prefix);
1101        let mut key_values = Vec::new();
1102        let key_prefix_upper = get_upper_bound(key_prefix);
1103        let mut updates = self
1104            .updates
1105            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1106        let mut update = updates.next();
1107        if !self.deletion_set.delete_storage_first {
1108            let mut suffix_closed_set =
1109                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1110            for entry in self
1111                .context
1112                .store()
1113                .find_key_values_by_prefix(&key_prefix_full)
1114                .await?
1115            {
1116                let (key, value) = entry;
1117                loop {
1118                    match update {
1119                        Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1120                            if let Update::Set(update_value) = update_value {
1121                                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1122                                key_values.push(key_value);
1123                            }
1124                            update = updates.next();
1125                            if update_key[len..] == key[..] {
1126                                break;
1127                            }
1128                        }
1129                        _ => {
1130                            let mut key_with_prefix = key_prefix.to_vec();
1131                            key_with_prefix.extend_from_slice(&key);
1132                            if !suffix_closed_set.find_key(&key_with_prefix) {
1133                                key_values.push((key, value));
1134                            }
1135                            break;
1136                        }
1137                    }
1138                }
1139            }
1140        }
1141        while let Some((update_key, update_value)) = update {
1142            if let Update::Set(update_value) = update_value {
1143                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1144                key_values.push(key_value);
1145            }
1146            update = updates.next();
1147        }
1148        Ok(key_values)
1149    }
1150
1151    async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1152        #[cfg(with_metrics)]
1153        let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1154        let mut hasher = sha3::Sha3_256::default();
1155        let mut count = 0u32;
1156        self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1157            count += 1;
1158            hasher.update_with_bytes(index)?;
1159            hasher.update_with_bytes(value)?;
1160            Ok(())
1161        })
1162        .await?;
1163        hasher.update_with_bcs_bytes(&count)?;
1164        Ok(hasher.finalize())
1165    }
1166}
1167
1168impl<C: Context> HashableView for KeyValueStoreView<C> {
1169    type Hasher = sha3::Sha3_256;
1170
1171    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1172        let hash = *self.hash.get_mut().unwrap();
1173        match hash {
1174            Some(hash) => Ok(hash),
1175            None => {
1176                let new_hash = self.compute_hash().await?;
1177                let hash = self.hash.get_mut().unwrap();
1178                *hash = Some(new_hash);
1179                Ok(new_hash)
1180            }
1181        }
1182    }
1183
1184    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1185        let hash = *self.hash.lock().unwrap();
1186        match hash {
1187            Some(hash) => Ok(hash),
1188            None => {
1189                let new_hash = self.compute_hash().await?;
1190                let mut hash = self.hash.lock().unwrap();
1191                *hash = Some(new_hash);
1192                Ok(new_hash)
1193            }
1194        }
1195    }
1196}
1197
1198/// Type wrapping `KeyValueStoreView` while memoizing the hash.
1199pub type HashedKeyValueStoreView<C> =
1200    WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1201
1202/// Wrapper around `KeyValueStoreView` to compute hashes based on the history of changes.
1203pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1204
1205/// A virtual DB client using a `KeyValueStoreView` as a backend (testing only).
1206#[cfg(with_testing)]
1207#[derive(Debug, Clone)]
1208pub struct ViewContainer<C> {
1209    view: Arc<RwLock<KeyValueStoreView<C>>>,
1210}
1211
1212#[cfg(with_testing)]
1213impl<C> WithError for ViewContainer<C> {
1214    type Error = ViewContainerError;
1215}
1216
1217#[cfg(with_testing)]
1218/// The error type for [`ViewContainer`] operations.
1219#[derive(Error, Debug)]
1220pub enum ViewContainerError {
1221    /// View error.
1222    #[error(transparent)]
1223    ViewError(#[from] ViewError),
1224
1225    /// BCS serialization error.
1226    #[error(transparent)]
1227    BcsError(#[from] bcs::Error),
1228}
1229
1230#[cfg(with_testing)]
1231impl KeyValueStoreError for ViewContainerError {
1232    const BACKEND: &'static str = "view_container";
1233}
1234
1235#[cfg(with_testing)]
1236impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1237    const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1238
1239    fn max_stream_queries(&self) -> usize {
1240        1
1241    }
1242
1243    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1244        let view = self.view.read().await;
1245        Ok(view.get(key).await?)
1246    }
1247
1248    async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1249        let view = self.view.read().await;
1250        Ok(view.contains_key(key).await?)
1251    }
1252
1253    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewContainerError> {
1254        let view = self.view.read().await;
1255        Ok(view.contains_keys(keys).await?)
1256    }
1257
1258    async fn read_multi_values_bytes(
1259        &self,
1260        keys: Vec<Vec<u8>>,
1261    ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1262        let view = self.view.read().await;
1263        Ok(view.multi_get(keys).await?)
1264    }
1265
1266    async fn find_keys_by_prefix(
1267        &self,
1268        key_prefix: &[u8],
1269    ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1270        let view = self.view.read().await;
1271        Ok(view.find_keys_by_prefix(key_prefix).await?)
1272    }
1273
1274    async fn find_key_values_by_prefix(
1275        &self,
1276        key_prefix: &[u8],
1277    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1278        let view = self.view.read().await;
1279        Ok(view.find_key_values_by_prefix(key_prefix).await?)
1280    }
1281}
1282
1283#[cfg(with_testing)]
1284impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1285    const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1286
1287    async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1288        let mut view = self.view.write().await;
1289        view.write_batch(batch).await?;
1290        let mut batch = Batch::new();
1291        view.flush(&mut batch)?;
1292        view.context()
1293            .store()
1294            .write_batch(batch)
1295            .await
1296            .map_err(ViewError::from)?;
1297        Ok(())
1298    }
1299
1300    async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1301        Ok(())
1302    }
1303}
1304
1305#[cfg(with_testing)]
1306impl<C: Context> ViewContainer<C> {
1307    /// Creates a [`ViewContainer`].
1308    pub async fn new(context: C) -> Result<Self, ViewError> {
1309        let view = KeyValueStoreView::load(context).await?;
1310        let view = Arc::new(RwLock::new(view));
1311        Ok(Self { view })
1312    }
1313}