linera_views/views/
key_value_store_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! We implement two types:
5//! 1) The first type `KeyValueStoreView` implements View and the function of `KeyValueStore`.
6//!
7//! 2) The second type `ViewContainer` encapsulates `KeyValueStoreView` and provides the following functionalities:
8//!    * The `Clone` trait
9//!    * a `write_batch` that takes a `&self` instead of a `&mut self`
10//!    * a `write_batch` that writes in the context instead of writing of the view.
11//!
12//! Currently, that second type is only used for tests.
13//!
14//! Key tags to create the sub-keys of a `KeyValueStoreView` on top of the base key.
15
16use std::{collections::BTreeMap, fmt::Debug, ops::Bound::Included, sync::Mutex};
17
18use allocative::Allocative;
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{data_types::ArithmeticError, ensure, visit_allocative_simple};
22use serde::{Deserialize, Serialize};
23
24use crate::{
25    batch::{Batch, WriteOperation},
26    common::{
27        from_bytes_option, from_bytes_option_or_default, get_key_range_for_prefix, get_upper_bound,
28        DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
29    },
30    context::Context,
31    hashable_wrapper::WrappedHashableContainerView,
32    historical_hash_wrapper::HistoricallyHashableView,
33    map_view::ByteMapView,
34    store::ReadableKeyValueStore,
35    views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
36};
37
38#[cfg(with_metrics)]
39mod metrics {
40    use std::sync::LazyLock;
41
42    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
43    use prometheus::HistogramVec;
44
45    /// The latency of hash computation
46    pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
47        register_histogram_vec(
48            "key_value_store_view_hash_latency",
49            "KeyValueStoreView hash latency",
50            &[],
51            exponential_bucket_latencies(5.0),
52        )
53    });
54
55    /// The latency of get operation
56    pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57        register_histogram_vec(
58            "key_value_store_view_get_latency",
59            "KeyValueStoreView get latency",
60            &[],
61            exponential_bucket_latencies(5.0),
62        )
63    });
64
65    /// The latency of multi get
66    pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
67        LazyLock::new(|| {
68            register_histogram_vec(
69                "key_value_store_view_multi_get_latency",
70                "KeyValueStoreView multi get latency",
71                &[],
72                exponential_bucket_latencies(5.0),
73            )
74        });
75
76    /// The latency of contains key
77    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
78        LazyLock::new(|| {
79            register_histogram_vec(
80                "key_value_store_view_contains_key_latency",
81                "KeyValueStoreView contains key latency",
82                &[],
83                exponential_bucket_latencies(5.0),
84            )
85        });
86
87    /// The latency of contains keys
88    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
89        LazyLock::new(|| {
90            register_histogram_vec(
91                "key_value_store_view_contains_keys_latency",
92                "KeyValueStoreView contains keys latency",
93                &[],
94                exponential_bucket_latencies(5.0),
95            )
96        });
97
98    /// The latency of find keys by prefix operation
99    pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
100        LazyLock::new(|| {
101            register_histogram_vec(
102                "key_value_store_view_find_keys_by_prefix_latency",
103                "KeyValueStoreView find keys by prefix latency",
104                &[],
105                exponential_bucket_latencies(5.0),
106            )
107        });
108
109    /// The latency of find key values by prefix operation
110    pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
111        LazyLock::new(|| {
112            register_histogram_vec(
113                "key_value_store_view_find_key_values_by_prefix_latency",
114                "KeyValueStoreView find key values by prefix latency",
115                &[],
116                exponential_bucket_latencies(5.0),
117            )
118        });
119
120    /// The latency of write batch operation
121    pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
122        LazyLock::new(|| {
123            register_histogram_vec(
124                "key_value_store_view_write_batch_latency",
125                "KeyValueStoreView write batch latency",
126                &[],
127                exponential_bucket_latencies(5.0),
128            )
129        });
130}
131
132#[cfg(with_testing)]
133use {
134    crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
135    async_lock::RwLock,
136    std::sync::Arc,
137    thiserror::Error,
138};
139
140#[repr(u8)]
141enum KeyTag {
142    /// Prefix for the indices of the view.
143    Index = MIN_VIEW_TAG,
144    /// The total stored size
145    TotalSize,
146    /// The prefix where the sizes are being stored
147    Sizes,
148    /// Prefix for the hash.
149    Hash,
150}
151
152/// A pair containing the key and value size.
153#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Allocative)]
154pub struct SizeData {
155    /// The size of the key
156    pub key: u32,
157    /// The size of the value
158    pub value: u32,
159}
160
161impl SizeData {
162    /// Sums both terms
163    pub fn sum(&mut self) -> u32 {
164        self.key + self.value
165    }
166
167    /// Adds a size to `self`
168    pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
169        self.key = self
170            .key
171            .checked_add(size.key)
172            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
173        self.value = self
174            .value
175            .checked_add(size.value)
176            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
177        Ok(())
178    }
179
180    /// Subtracts a size from `self`
181    pub fn sub_assign(&mut self, size: SizeData) {
182        self.key -= size.key;
183        self.value -= size.value;
184    }
185}
186
187/// A view that represents the functions of `KeyValueStore`.
188///
189/// Comment on the data set:
190/// In order to work, the view needs to store the updates and deleted prefixes.
191/// The updates and deleted prefixes have to be coherent. This means:
192/// * If an index is deleted by one in deleted prefixes then it should not be present
193///   in the updates at all.
194/// * [`DeletePrefix::key_prefix`][entry1] should not dominate anyone. That is if we have `[0,2]`
195///   then we should not have `[0,2,3]` since it would be dominated by the preceding.
196///
197/// With that we have:
198/// * in order to test if an `index` is deleted by a prefix we compute the highest deleted prefix `dp`
199///   such that `dp <= index`.
200///   If `dp` is indeed a prefix then we conclude that `index` is deleted, otherwise not.
201///   The no domination is essential here.
202///
203/// [entry1]: crate::batch::WriteOperation::DeletePrefix
204#[derive(Debug, Allocative)]
205#[allocative(bound = "C")]
206pub struct KeyValueStoreView<C> {
207    /// The view context.
208    #[allocative(skip)]
209    context: C,
210    /// Tracks deleted key prefixes.
211    deletion_set: DeletionSet,
212    /// Pending changes not yet persisted to storage.
213    updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
214    /// The total size of keys and values persisted in storage.
215    stored_total_size: SizeData,
216    /// The total size of keys and values including pending changes.
217    total_size: SizeData,
218    /// Map of key to value size for tracking storage usage.
219    sizes: ByteMapView<C, u32>,
220    /// The hash persisted in storage.
221    #[allocative(visit = visit_allocative_simple)]
222    stored_hash: Option<HasherOutput>,
223    /// Memoized hash, if any.
224    #[allocative(visit = visit_allocative_simple)]
225    hash: Mutex<Option<HasherOutput>>,
226}
227
228impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
229    type Target = KeyValueStoreView<C2>;
230
231    async fn with_context(
232        &mut self,
233        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
234    ) -> Self::Target {
235        let hash = *self.hash.lock().unwrap();
236        KeyValueStoreView {
237            context: ctx.clone()(&self.context),
238            deletion_set: self.deletion_set.clone(),
239            updates: self.updates.clone(),
240            stored_total_size: self.stored_total_size,
241            total_size: self.total_size,
242            sizes: self.sizes.with_context(ctx.clone()).await,
243            stored_hash: self.stored_hash,
244            hash: Mutex::new(hash),
245        }
246    }
247}
248
249impl<C: Context> View for KeyValueStoreView<C> {
250    const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
251
252    type Context = C;
253
254    fn context(&self) -> C {
255        self.context.clone()
256    }
257
258    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
259        let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
260        let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
261        let mut v = vec![key_hash, key_total_size];
262        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
263        let context_sizes = context.clone_with_base_key(base_key);
264        v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
265        Ok(v)
266    }
267
268    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
269        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
270        let total_size =
271            from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
272        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
273        let context_sizes = context.clone_with_base_key(base_key);
274        let sizes = ByteMapView::post_load(
275            context_sizes,
276            values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
277        )?;
278        Ok(Self {
279            context,
280            deletion_set: DeletionSet::new(),
281            updates: BTreeMap::new(),
282            stored_total_size: total_size,
283            total_size,
284            sizes,
285            stored_hash: hash,
286            hash: Mutex::new(hash),
287        })
288    }
289
290    fn rollback(&mut self) {
291        self.deletion_set.rollback();
292        self.updates.clear();
293        self.total_size = self.stored_total_size;
294        self.sizes.rollback();
295        *self.hash.get_mut().unwrap() = self.stored_hash;
296    }
297
298    async fn has_pending_changes(&self) -> bool {
299        if self.deletion_set.has_pending_changes() {
300            return true;
301        }
302        if !self.updates.is_empty() {
303            return true;
304        }
305        if self.stored_total_size != self.total_size {
306            return true;
307        }
308        if self.sizes.has_pending_changes().await {
309            return true;
310        }
311        let hash = self.hash.lock().unwrap();
312        self.stored_hash != *hash
313    }
314
315    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
316        let mut delete_view = false;
317        if self.deletion_set.delete_storage_first {
318            delete_view = true;
319            batch.delete_key_prefix(self.context.base_key().bytes.clone());
320            for (index, update) in self.updates.iter() {
321                if let Update::Set(value) = update {
322                    let key = self
323                        .context
324                        .base_key()
325                        .base_tag_index(KeyTag::Index as u8, index);
326                    batch.put_key_value_bytes(key, value.clone());
327                    delete_view = false;
328                }
329            }
330        } else {
331            for index in self.deletion_set.deleted_prefixes.iter() {
332                let key = self
333                    .context
334                    .base_key()
335                    .base_tag_index(KeyTag::Index as u8, index);
336                batch.delete_key_prefix(key);
337            }
338            for (index, update) in self.updates.iter() {
339                let key = self
340                    .context
341                    .base_key()
342                    .base_tag_index(KeyTag::Index as u8, index);
343                match update {
344                    Update::Removed => batch.delete_key(key),
345                    Update::Set(value) => batch.put_key_value_bytes(key, value.clone()),
346                }
347            }
348        }
349        self.sizes.pre_save(batch)?;
350        let hash = *self.hash.lock().unwrap();
351        if self.stored_hash != hash {
352            let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
353            match hash {
354                None => batch.delete_key(key),
355                Some(hash) => batch.put_key_value(key, &hash)?,
356            }
357        }
358        if self.stored_total_size != self.total_size {
359            let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
360            batch.put_key_value(key, &self.total_size)?;
361        }
362        Ok(delete_view)
363    }
364
365    fn post_save(&mut self) {
366        self.deletion_set.delete_storage_first = false;
367        self.deletion_set.deleted_prefixes.clear();
368        self.updates.clear();
369        self.sizes.post_save();
370        let hash = *self.hash.lock().unwrap();
371        self.stored_hash = hash;
372        self.stored_total_size = self.total_size;
373    }
374
375    fn clear(&mut self) {
376        self.deletion_set.clear();
377        self.updates.clear();
378        self.total_size = SizeData::default();
379        self.sizes.clear();
380        *self.hash.get_mut().unwrap() = None;
381    }
382}
383
384impl<C: Context> ClonableView for KeyValueStoreView<C> {
385    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
386        Ok(KeyValueStoreView {
387            context: self.context.clone(),
388            deletion_set: self.deletion_set.clone(),
389            updates: self.updates.clone(),
390            stored_total_size: self.stored_total_size,
391            total_size: self.total_size,
392            sizes: self.sizes.clone_unchecked()?,
393            stored_hash: self.stored_hash,
394            hash: Mutex::new(*self.hash.get_mut().unwrap()),
395        })
396    }
397}
398
399impl<C: Context> KeyValueStoreView<C> {
400    fn max_key_size(&self) -> usize {
401        let prefix_len = self.context.base_key().bytes.len();
402        <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
403    }
404
405    /// Getting the total sizes that will be used for keys and values when stored
406    /// ```rust
407    /// # tokio_test::block_on(async {
408    /// # use linera_views::context::MemoryContext;
409    /// # use linera_views::key_value_store_view::{KeyValueStoreView, SizeData};
410    /// # use linera_views::views::View;
411    /// # let context = MemoryContext::new_for_testing(());
412    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
413    /// view.insert(vec![0, 1], vec![0, 1, 2, 3, 4]).await.unwrap();
414    /// let total_size = view.total_size();
415    /// assert_eq!(total_size, SizeData { key: 2, value: 5 });
416    /// # })
417    /// ```
418    pub fn total_size(&self) -> SizeData {
419        self.total_size
420    }
421
422    /// Applies the function f over all indices. If the function f returns
423    /// false, then the loop ends prematurely.
424    /// ```rust
425    /// # tokio_test::block_on(async {
426    /// # use linera_views::context::MemoryContext;
427    /// # use linera_views::key_value_store_view::KeyValueStoreView;
428    /// # use linera_views::views::View;
429    /// # let context = MemoryContext::new_for_testing(());
430    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
431    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
432    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
433    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
434    /// let mut count = 0;
435    /// view.for_each_index_while(|_key| {
436    ///     count += 1;
437    ///     Ok(count < 2)
438    /// })
439    /// .await
440    /// .unwrap();
441    /// assert_eq!(count, 2);
442    /// # })
443    /// ```
444    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
445    where
446        F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
447    {
448        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
449        let mut updates = self.updates.iter();
450        let mut update = updates.next();
451        if !self.deletion_set.delete_storage_first {
452            let mut suffix_closed_set =
453                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
454            for index in self
455                .context
456                .store()
457                .find_keys_by_prefix(&key_prefix)
458                .await?
459            {
460                loop {
461                    match update {
462                        Some((key, value)) if key <= &index => {
463                            if let Update::Set(_) = value {
464                                if !f(key)? {
465                                    return Ok(());
466                                }
467                            }
468                            update = updates.next();
469                            if key == &index {
470                                break;
471                            }
472                        }
473                        _ => {
474                            if !suffix_closed_set.find_key(&index) && !f(&index)? {
475                                return Ok(());
476                            }
477                            break;
478                        }
479                    }
480                }
481            }
482        }
483        while let Some((key, value)) = update {
484            if let Update::Set(_) = value {
485                if !f(key)? {
486                    return Ok(());
487                }
488            }
489            update = updates.next();
490        }
491        Ok(())
492    }
493
494    /// Applies the function f over all indices.
495    /// ```rust
496    /// # tokio_test::block_on(async {
497    /// # use linera_views::context::MemoryContext;
498    /// # use linera_views::key_value_store_view::KeyValueStoreView;
499    /// # use linera_views::views::View;
500    /// # let context = MemoryContext::new_for_testing(());
501    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
502    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
503    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
504    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
505    /// let mut count = 0;
506    /// view.for_each_index(|_key| {
507    ///     count += 1;
508    ///     Ok(())
509    /// })
510    /// .await
511    /// .unwrap();
512    /// assert_eq!(count, 3);
513    /// # })
514    /// ```
515    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
516    where
517        F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
518    {
519        self.for_each_index_while(|key| {
520            f(key)?;
521            Ok(true)
522        })
523        .await
524    }
525
526    /// Applies the function f over all index/value pairs.
527    /// If the function f returns false then the loop ends prematurely.
528    /// ```rust
529    /// # tokio_test::block_on(async {
530    /// # use linera_views::context::MemoryContext;
531    /// # use linera_views::key_value_store_view::KeyValueStoreView;
532    /// # use linera_views::views::View;
533    /// # let context = MemoryContext::new_for_testing(());
534    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
535    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
536    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
537    /// let mut values = Vec::new();
538    /// view.for_each_index_value_while(|_key, value| {
539    ///     values.push(value.to_vec());
540    ///     Ok(values.len() < 1)
541    /// })
542    /// .await
543    /// .unwrap();
544    /// assert_eq!(values, vec![vec![0]]);
545    /// # })
546    /// ```
547    pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
548    where
549        F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
550    {
551        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
552        let mut updates = self.updates.iter();
553        let mut update = updates.next();
554        if !self.deletion_set.delete_storage_first {
555            let mut suffix_closed_set =
556                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
557            for entry in self
558                .context
559                .store()
560                .find_key_values_by_prefix(&key_prefix)
561                .await?
562            {
563                let (index, index_val) = entry;
564                loop {
565                    match update {
566                        Some((key, value)) if key <= &index => {
567                            if let Update::Set(value) = value {
568                                if !f(key, value)? {
569                                    return Ok(());
570                                }
571                            }
572                            update = updates.next();
573                            if key == &index {
574                                break;
575                            }
576                        }
577                        _ => {
578                            if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
579                                return Ok(());
580                            }
581                            break;
582                        }
583                    }
584                }
585            }
586        }
587        while let Some((key, value)) = update {
588            if let Update::Set(value) = value {
589                if !f(key, value)? {
590                    return Ok(());
591                }
592            }
593            update = updates.next();
594        }
595        Ok(())
596    }
597
598    /// Applies the function f over all index/value pairs.
599    /// ```rust
600    /// # tokio_test::block_on(async {
601    /// # use linera_views::context::MemoryContext;
602    /// # use linera_views::key_value_store_view::KeyValueStoreView;
603    /// # use linera_views::views::View;
604    /// # let context = MemoryContext::new_for_testing(());
605    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
606    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
607    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
608    /// let mut part_keys = Vec::new();
609    /// view.for_each_index_while(|key| {
610    ///     part_keys.push(key.to_vec());
611    ///     Ok(part_keys.len() < 1)
612    /// })
613    /// .await
614    /// .unwrap();
615    /// assert_eq!(part_keys, vec![vec![0, 1]]);
616    /// # })
617    /// ```
618    pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
619    where
620        F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
621    {
622        self.for_each_index_value_while(|key, value| {
623            f(key, value)?;
624            Ok(true)
625        })
626        .await
627    }
628
629    /// Returns the list of indices in lexicographic order.
630    /// ```rust
631    /// # tokio_test::block_on(async {
632    /// # use linera_views::context::MemoryContext;
633    /// # use linera_views::key_value_store_view::KeyValueStoreView;
634    /// # use linera_views::views::View;
635    /// # let context = MemoryContext::new_for_testing(());
636    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
637    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
638    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
639    /// let indices = view.indices().await.unwrap();
640    /// assert_eq!(indices, vec![vec![0, 1], vec![0, 2]]);
641    /// # })
642    /// ```
643    pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
644        let mut indices = Vec::new();
645        self.for_each_index(|index| {
646            indices.push(index.to_vec());
647            Ok(())
648        })
649        .await?;
650        Ok(indices)
651    }
652
653    /// Returns the list of indices and values in lexicographic order.
654    /// ```rust
655    /// # tokio_test::block_on(async {
656    /// # use linera_views::context::MemoryContext;
657    /// # use linera_views::key_value_store_view::KeyValueStoreView;
658    /// # use linera_views::views::View;
659    /// # let context = MemoryContext::new_for_testing(());
660    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
661    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
662    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
663    /// let key_values = view.indices().await.unwrap();
664    /// assert_eq!(key_values, vec![vec![0, 1], vec![0, 2]]);
665    /// # })
666    /// ```
667    pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
668        let mut index_values = Vec::new();
669        self.for_each_index_value(|index, value| {
670            index_values.push((index.to_vec(), value.to_vec()));
671            Ok(())
672        })
673        .await?;
674        Ok(index_values)
675    }
676
677    /// Returns the number of entries.
678    /// ```rust
679    /// # tokio_test::block_on(async {
680    /// # use linera_views::context::MemoryContext;
681    /// # use linera_views::key_value_store_view::KeyValueStoreView;
682    /// # use linera_views::views::View;
683    /// # let context = MemoryContext::new_for_testing(());
684    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
685    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
686    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
687    /// let count = view.count().await.unwrap();
688    /// assert_eq!(count, 2);
689    /// # })
690    /// ```
691    pub async fn count(&self) -> Result<usize, ViewError> {
692        let mut count = 0;
693        self.for_each_index(|_index| {
694            count += 1;
695            Ok(())
696        })
697        .await?;
698        Ok(count)
699    }
700
701    /// Obtains the value at the given index, if any.
702    /// ```rust
703    /// # tokio_test::block_on(async {
704    /// # use linera_views::context::MemoryContext;
705    /// # use linera_views::key_value_store_view::KeyValueStoreView;
706    /// # use linera_views::views::View;
707    /// # let context = MemoryContext::new_for_testing(());
708    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
709    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
710    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![42]));
711    /// assert_eq!(view.get(&[0, 2]).await.unwrap(), None);
712    /// # })
713    /// ```
714    pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
715        #[cfg(with_metrics)]
716        let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
717        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
718        if let Some(update) = self.updates.get(index) {
719            let value = match update {
720                Update::Removed => None,
721                Update::Set(value) => Some(value.clone()),
722            };
723            return Ok(value);
724        }
725        if self.deletion_set.contains_prefix_of(index) {
726            return Ok(None);
727        }
728        let key = self
729            .context
730            .base_key()
731            .base_tag_index(KeyTag::Index as u8, index);
732        Ok(self.context.store().read_value_bytes(&key).await?)
733    }
734
735    /// Tests whether the store contains a specific index.
736    /// ```rust
737    /// # tokio_test::block_on(async {
738    /// # use linera_views::context::MemoryContext;
739    /// # use linera_views::key_value_store_view::KeyValueStoreView;
740    /// # use linera_views::views::View;
741    /// # let context = MemoryContext::new_for_testing(());
742    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
743    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
744    /// assert!(view.contains_key(&[0, 1]).await.unwrap());
745    /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
746    /// # })
747    /// ```
748    pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
749        #[cfg(with_metrics)]
750        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
751        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
752        if let Some(update) = self.updates.get(index) {
753            let test = match update {
754                Update::Removed => false,
755                Update::Set(_value) => true,
756            };
757            return Ok(test);
758        }
759        if self.deletion_set.contains_prefix_of(index) {
760            return Ok(false);
761        }
762        let key = self
763            .context
764            .base_key()
765            .base_tag_index(KeyTag::Index as u8, index);
766        Ok(self.context.store().contains_key(&key).await?)
767    }
768
769    /// Tests whether the view contains a range of indices
770    /// ```rust
771    /// # tokio_test::block_on(async {
772    /// # use linera_views::context::MemoryContext;
773    /// # use linera_views::key_value_store_view::KeyValueStoreView;
774    /// # use linera_views::views::View;
775    /// # let context = MemoryContext::new_for_testing(());
776    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
777    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
778    /// let keys = vec![vec![0, 1], vec![0, 2]];
779    /// let results = view.contains_keys(&keys).await.unwrap();
780    /// assert_eq!(results, vec![true, false]);
781    /// # })
782    /// ```
783    pub async fn contains_keys(&self, indices: &[Vec<u8>]) -> Result<Vec<bool>, ViewError> {
784        #[cfg(with_metrics)]
785        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
786        let mut results = Vec::with_capacity(indices.len());
787        let mut missed_indices = Vec::new();
788        let mut vector_query = Vec::new();
789        for (i, index) in indices.iter().enumerate() {
790            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
791            if let Some(update) = self.updates.get(index) {
792                let value = match update {
793                    Update::Removed => false,
794                    Update::Set(_) => true,
795                };
796                results.push(value);
797            } else {
798                results.push(false);
799                if !self.deletion_set.contains_prefix_of(index) {
800                    missed_indices.push(i);
801                    let key = self
802                        .context
803                        .base_key()
804                        .base_tag_index(KeyTag::Index as u8, index);
805                    vector_query.push(key);
806                }
807            }
808        }
809        let values = self.context.store().contains_keys(&vector_query).await?;
810        for (i, value) in missed_indices.into_iter().zip(values) {
811            results[i] = value;
812        }
813        Ok(results)
814    }
815
816    /// Obtains the values of a range of indices
817    /// ```rust
818    /// # tokio_test::block_on(async {
819    /// # use linera_views::context::MemoryContext;
820    /// # use linera_views::key_value_store_view::KeyValueStoreView;
821    /// # use linera_views::views::View;
822    /// # let context = MemoryContext::new_for_testing(());
823    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
824    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
825    /// assert_eq!(
826    ///     view.multi_get(&[vec![0, 1], vec![0, 2]]).await.unwrap(),
827    ///     vec![Some(vec![42]), None]
828    /// );
829    /// # })
830    /// ```
831    pub async fn multi_get(&self, indices: &[Vec<u8>]) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
832        #[cfg(with_metrics)]
833        let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
834        let mut result = Vec::with_capacity(indices.len());
835        let mut missed_indices = Vec::new();
836        let mut vector_query = Vec::new();
837        for (i, index) in indices.iter().enumerate() {
838            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
839            if let Some(update) = self.updates.get(index) {
840                let value = match update {
841                    Update::Removed => None,
842                    Update::Set(value) => Some(value.clone()),
843                };
844                result.push(value);
845            } else {
846                result.push(None);
847                if !self.deletion_set.contains_prefix_of(index) {
848                    missed_indices.push(i);
849                    let key = self
850                        .context
851                        .base_key()
852                        .base_tag_index(KeyTag::Index as u8, index);
853                    vector_query.push(key);
854                }
855            }
856        }
857        let values = self
858            .context
859            .store()
860            .read_multi_values_bytes(&vector_query)
861            .await?;
862        for (i, value) in missed_indices.into_iter().zip(values) {
863            result[i] = value;
864        }
865        Ok(result)
866    }
867
868    /// Applies the given batch of `crate::common::WriteOperation`.
869    /// ```rust
870    /// # tokio_test::block_on(async {
871    /// # use linera_views::context::MemoryContext;
872    /// # use linera_views::key_value_store_view::KeyValueStoreView;
873    /// # use linera_views::batch::Batch;
874    /// # use linera_views::views::View;
875    /// # let context = MemoryContext::new_for_testing(());
876    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
877    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
878    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
879    /// let mut batch = Batch::new();
880    /// batch.delete_key_prefix(vec![0]);
881    /// view.write_batch(batch).await.unwrap();
882    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
883    /// assert_eq!(key_values, vec![]);
884    /// # })
885    /// ```
886    pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
887        #[cfg(with_metrics)]
888        let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
889        *self.hash.get_mut().unwrap() = None;
890        let max_key_size = self.max_key_size();
891        for operation in batch.operations {
892            match operation {
893                WriteOperation::Delete { key } => {
894                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
895                    if let Some(value) = self.sizes.get(&key).await? {
896                        let entry_size = SizeData {
897                            key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
898                            value,
899                        };
900                        self.total_size.sub_assign(entry_size);
901                    }
902                    self.sizes.remove(key.clone());
903                    if self.deletion_set.contains_prefix_of(&key) {
904                        // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
905                        self.updates.remove(&key);
906                    } else {
907                        self.updates.insert(key, Update::Removed);
908                    }
909                }
910                WriteOperation::Put { key, value } => {
911                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
912                    let entry_size = SizeData {
913                        key: key.len() as u32,
914                        value: value.len() as u32,
915                    };
916                    self.total_size.add_assign(entry_size)?;
917                    if let Some(value) = self.sizes.get(&key).await? {
918                        let entry_size = SizeData {
919                            key: key.len() as u32,
920                            value,
921                        };
922                        self.total_size.sub_assign(entry_size);
923                    }
924                    self.sizes.insert(key.clone(), entry_size.value);
925                    self.updates.insert(key, Update::Set(value));
926                }
927                WriteOperation::DeletePrefix { key_prefix } => {
928                    ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
929                    let key_list = self
930                        .updates
931                        .range(get_key_range_for_prefix(key_prefix.clone()))
932                        .map(|x| x.0.to_vec())
933                        .collect::<Vec<_>>();
934                    for key in key_list {
935                        self.updates.remove(&key);
936                    }
937                    let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
938                    for (key, value) in key_values {
939                        let entry_size = SizeData {
940                            key: key.len() as u32,
941                            value,
942                        };
943                        self.total_size.sub_assign(entry_size);
944                        self.sizes.remove(key);
945                    }
946                    self.sizes.remove_by_prefix(key_prefix.clone());
947                    self.deletion_set.insert_key_prefix(key_prefix);
948                }
949            }
950        }
951        Ok(())
952    }
953
954    /// Sets or inserts a value.
955    /// ```rust
956    /// # tokio_test::block_on(async {
957    /// # use linera_views::context::MemoryContext;
958    /// # use linera_views::key_value_store_view::KeyValueStoreView;
959    /// # use linera_views::views::View;
960    /// # let context = MemoryContext::new_for_testing(());
961    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
962    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
963    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![34]));
964    /// # })
965    /// ```
966    pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
967        let mut batch = Batch::new();
968        batch.put_key_value_bytes(index, value);
969        self.write_batch(batch).await
970    }
971
972    /// Removes a value. If absent then the action has no effect.
973    /// ```rust
974    /// # tokio_test::block_on(async {
975    /// # use linera_views::context::MemoryContext;
976    /// # use linera_views::key_value_store_view::KeyValueStoreView;
977    /// # use linera_views::views::View;
978    /// # let context = MemoryContext::new_for_testing(());
979    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
980    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
981    /// view.remove(vec![0, 1]).await.unwrap();
982    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
983    /// # })
984    /// ```
985    pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
986        let mut batch = Batch::new();
987        batch.delete_key(index);
988        self.write_batch(batch).await
989    }
990
991    /// Deletes a key prefix.
992    /// ```rust
993    /// # tokio_test::block_on(async {
994    /// # use linera_views::context::MemoryContext;
995    /// # use linera_views::key_value_store_view::KeyValueStoreView;
996    /// # use linera_views::views::View;
997    /// # let context = MemoryContext::new_for_testing(());
998    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
999    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1000    /// view.remove_by_prefix(vec![0]).await.unwrap();
1001    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
1002    /// # })
1003    /// ```
1004    pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
1005        let mut batch = Batch::new();
1006        batch.delete_key_prefix(key_prefix);
1007        self.write_batch(batch).await
1008    }
1009
1010    /// Iterates over all the keys matching the given prefix. The prefix is not included in the returned keys.
1011    /// ```rust
1012    /// # tokio_test::block_on(async {
1013    /// # use linera_views::context::MemoryContext;
1014    /// # use linera_views::key_value_store_view::KeyValueStoreView;
1015    /// # use linera_views::views::View;
1016    /// # let context = MemoryContext::new_for_testing(());
1017    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1018    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1019    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1020    /// let keys = view.find_keys_by_prefix(&[0]).await.unwrap();
1021    /// assert_eq!(keys, vec![vec![1]]);
1022    /// # })
1023    /// ```
1024    pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
1025        #[cfg(with_metrics)]
1026        let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
1027        ensure!(
1028            key_prefix.len() <= self.max_key_size(),
1029            ViewError::KeyTooLong
1030        );
1031        let len = key_prefix.len();
1032        let key_prefix_full = self
1033            .context
1034            .base_key()
1035            .base_tag_index(KeyTag::Index as u8, key_prefix);
1036        let mut keys = Vec::new();
1037        let key_prefix_upper = get_upper_bound(key_prefix);
1038        let mut updates = self
1039            .updates
1040            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1041        let mut update = updates.next();
1042        if !self.deletion_set.delete_storage_first {
1043            let mut suffix_closed_set =
1044                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1045            for key in self
1046                .context
1047                .store()
1048                .find_keys_by_prefix(&key_prefix_full)
1049                .await?
1050            {
1051                loop {
1052                    match update {
1053                        Some((update_key, update_value))
1054                            if &update_key[len..] <= key.as_slice() =>
1055                        {
1056                            if let Update::Set(_) = update_value {
1057                                keys.push(update_key[len..].to_vec());
1058                            }
1059                            update = updates.next();
1060                            if update_key[len..] == key[..] {
1061                                break;
1062                            }
1063                        }
1064                        _ => {
1065                            let mut key_with_prefix = key_prefix.to_vec();
1066                            key_with_prefix.extend_from_slice(&key);
1067                            if !suffix_closed_set.find_key(&key_with_prefix) {
1068                                keys.push(key);
1069                            }
1070                            break;
1071                        }
1072                    }
1073                }
1074            }
1075        }
1076        while let Some((update_key, update_value)) = update {
1077            if let Update::Set(_) = update_value {
1078                let update_key = update_key[len..].to_vec();
1079                keys.push(update_key);
1080            }
1081            update = updates.next();
1082        }
1083        Ok(keys)
1084    }
1085
1086    /// Iterates over all the key-value pairs, for keys matching the given prefix. The
1087    /// prefix is not included in the returned keys.
1088    /// ```rust
1089    /// # tokio_test::block_on(async {
1090    /// # use linera_views::context::MemoryContext;
1091    /// # use linera_views::key_value_store_view::KeyValueStoreView;
1092    /// # use linera_views::views::View;
1093    /// # let context = MemoryContext::new_for_testing(());
1094    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1095    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1096    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1097    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
1098    /// assert_eq!(key_values, vec![(vec![1], vec![34])]);
1099    /// # })
1100    /// ```
1101    pub async fn find_key_values_by_prefix(
1102        &self,
1103        key_prefix: &[u8],
1104    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1105        #[cfg(with_metrics)]
1106        let _latency =
1107            metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1108        ensure!(
1109            key_prefix.len() <= self.max_key_size(),
1110            ViewError::KeyTooLong
1111        );
1112        let len = key_prefix.len();
1113        let key_prefix_full = self
1114            .context
1115            .base_key()
1116            .base_tag_index(KeyTag::Index as u8, key_prefix);
1117        let mut key_values = Vec::new();
1118        let key_prefix_upper = get_upper_bound(key_prefix);
1119        let mut updates = self
1120            .updates
1121            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1122        let mut update = updates.next();
1123        if !self.deletion_set.delete_storage_first {
1124            let mut suffix_closed_set =
1125                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1126            for entry in self
1127                .context
1128                .store()
1129                .find_key_values_by_prefix(&key_prefix_full)
1130                .await?
1131            {
1132                let (key, value) = entry;
1133                loop {
1134                    match update {
1135                        Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1136                            if let Update::Set(update_value) = update_value {
1137                                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1138                                key_values.push(key_value);
1139                            }
1140                            update = updates.next();
1141                            if update_key[len..] == key[..] {
1142                                break;
1143                            }
1144                        }
1145                        _ => {
1146                            let mut key_with_prefix = key_prefix.to_vec();
1147                            key_with_prefix.extend_from_slice(&key);
1148                            if !suffix_closed_set.find_key(&key_with_prefix) {
1149                                key_values.push((key, value));
1150                            }
1151                            break;
1152                        }
1153                    }
1154                }
1155            }
1156        }
1157        while let Some((update_key, update_value)) = update {
1158            if let Update::Set(update_value) = update_value {
1159                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1160                key_values.push(key_value);
1161            }
1162            update = updates.next();
1163        }
1164        Ok(key_values)
1165    }
1166
1167    async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1168        #[cfg(with_metrics)]
1169        let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1170        let mut hasher = sha3::Sha3_256::default();
1171        let mut count = 0u32;
1172        self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1173            count += 1;
1174            hasher.update_with_bytes(index)?;
1175            hasher.update_with_bytes(value)?;
1176            Ok(())
1177        })
1178        .await?;
1179        hasher.update_with_bcs_bytes(&count)?;
1180        Ok(hasher.finalize())
1181    }
1182}
1183
1184impl<C: Context> HashableView for KeyValueStoreView<C> {
1185    type Hasher = sha3::Sha3_256;
1186
1187    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1188        let hash = *self.hash.get_mut().unwrap();
1189        match hash {
1190            Some(hash) => Ok(hash),
1191            None => {
1192                let new_hash = self.compute_hash().await?;
1193                let hash = self.hash.get_mut().unwrap();
1194                *hash = Some(new_hash);
1195                Ok(new_hash)
1196            }
1197        }
1198    }
1199
1200    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1201        let hash = *self.hash.lock().unwrap();
1202        match hash {
1203            Some(hash) => Ok(hash),
1204            None => {
1205                let new_hash = self.compute_hash().await?;
1206                let mut hash = self.hash.lock().unwrap();
1207                *hash = Some(new_hash);
1208                Ok(new_hash)
1209            }
1210        }
1211    }
1212}
1213
1214/// Type wrapping `KeyValueStoreView` while memoizing the hash.
1215pub type HashedKeyValueStoreView<C> =
1216    WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1217
1218/// Wrapper around `KeyValueStoreView` to compute hashes based on the history of changes.
1219pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1220
1221/// A virtual DB client using a `KeyValueStoreView` as a backend (testing only).
1222#[cfg(with_testing)]
1223#[derive(Debug, Clone)]
1224pub struct ViewContainer<C> {
1225    view: Arc<RwLock<KeyValueStoreView<C>>>,
1226}
1227
1228#[cfg(with_testing)]
1229impl<C> WithError for ViewContainer<C> {
1230    type Error = ViewContainerError;
1231}
1232
1233#[cfg(with_testing)]
1234/// The error type for [`ViewContainer`] operations.
1235#[derive(Error, Debug)]
1236pub enum ViewContainerError {
1237    /// View error.
1238    #[error(transparent)]
1239    ViewError(#[from] ViewError),
1240
1241    /// BCS serialization error.
1242    #[error(transparent)]
1243    BcsError(#[from] bcs::Error),
1244}
1245
1246#[cfg(with_testing)]
1247impl KeyValueStoreError for ViewContainerError {
1248    const BACKEND: &'static str = "view_container";
1249}
1250
1251#[cfg(with_testing)]
1252impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1253    const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1254
1255    fn max_stream_queries(&self) -> usize {
1256        1
1257    }
1258
1259    fn root_key(&self) -> Result<Vec<u8>, ViewContainerError> {
1260        Ok(Vec::new())
1261    }
1262
1263    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1264        let view = self.view.read().await;
1265        Ok(view.get(key).await?)
1266    }
1267
1268    async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1269        let view = self.view.read().await;
1270        Ok(view.contains_key(key).await?)
1271    }
1272
1273    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, ViewContainerError> {
1274        let view = self.view.read().await;
1275        Ok(view.contains_keys(keys).await?)
1276    }
1277
1278    async fn read_multi_values_bytes(
1279        &self,
1280        keys: &[Vec<u8>],
1281    ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1282        let view = self.view.read().await;
1283        Ok(view.multi_get(keys).await?)
1284    }
1285
1286    async fn find_keys_by_prefix(
1287        &self,
1288        key_prefix: &[u8],
1289    ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1290        let view = self.view.read().await;
1291        Ok(view.find_keys_by_prefix(key_prefix).await?)
1292    }
1293
1294    async fn find_key_values_by_prefix(
1295        &self,
1296        key_prefix: &[u8],
1297    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1298        let view = self.view.read().await;
1299        Ok(view.find_key_values_by_prefix(key_prefix).await?)
1300    }
1301}
1302
1303#[cfg(with_testing)]
1304impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1305    const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1306
1307    async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1308        let mut view = self.view.write().await;
1309        view.write_batch(batch).await?;
1310        let mut batch = Batch::new();
1311        view.pre_save(&mut batch)?;
1312        view.post_save();
1313        view.context()
1314            .store()
1315            .write_batch(batch)
1316            .await
1317            .map_err(ViewError::from)?;
1318        Ok(())
1319    }
1320
1321    async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1322        Ok(())
1323    }
1324}
1325
1326#[cfg(with_testing)]
1327impl<C: Context> ViewContainer<C> {
1328    /// Creates a [`ViewContainer`].
1329    pub async fn new(context: C) -> Result<Self, ViewError> {
1330        let view = KeyValueStoreView::load(context).await?;
1331        let view = Arc::new(RwLock::new(view));
1332        Ok(Self { view })
1333    }
1334}