Skip to main content

linera_views/views/
key_value_store_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! We implement two types:
5//! 1) The first type `KeyValueStoreView` implements View and the function of `KeyValueStore`.
6//!
7//! 2) The second type `ViewContainer` encapsulates `KeyValueStoreView` and provides the following functionalities:
8//!    * The `Clone` trait
9//!    * a `write_batch` that takes a `&self` instead of a `&mut self`
10//!    * a `write_batch` that writes in the context instead of writing of the view.
11//!
12//! Currently, that second type is only used for tests.
13//!
14//! Key tags to create the sub-keys of a `KeyValueStoreView` on top of the base key.
15
16use std::{collections::BTreeMap, fmt::Debug, ops::Bound::Included, sync::Mutex};
17
18use allocative::Allocative;
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{ensure, visit_allocative_simple};
22
23use crate::{
24    batch::{Batch, WriteOperation},
25    common::{
26        from_bytes_option, get_key_range_for_prefix, get_upper_bound, DeletionSet, HasherOutput,
27        SuffixClosedSetIterator, Update,
28    },
29    context::Context,
30    hashable_wrapper::WrappedHashableContainerView,
31    historical_hash_wrapper::HistoricallyHashableView,
32    store::ReadableKeyValueStore,
33    views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
34};
35
36#[cfg(with_metrics)]
37mod metrics {
38    use std::sync::LazyLock;
39
40    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
41    use prometheus::HistogramVec;
42
43    /// The latency of hash computation
44    pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
45        register_histogram_vec(
46            "key_value_store_view_hash_latency",
47            "KeyValueStoreView hash latency",
48            &[],
49            exponential_bucket_latencies(5.0),
50        )
51    });
52
53    /// The latency of get operation
54    pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
55        register_histogram_vec(
56            "key_value_store_view_get_latency",
57            "KeyValueStoreView get latency",
58            &[],
59            exponential_bucket_latencies(5.0),
60        )
61    });
62
63    /// The latency of multi get
64    pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
65        LazyLock::new(|| {
66            register_histogram_vec(
67                "key_value_store_view_multi_get_latency",
68                "KeyValueStoreView multi get latency",
69                &[],
70                exponential_bucket_latencies(5.0),
71            )
72        });
73
74    /// The latency of contains key
75    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
76        LazyLock::new(|| {
77            register_histogram_vec(
78                "key_value_store_view_contains_key_latency",
79                "KeyValueStoreView contains key latency",
80                &[],
81                exponential_bucket_latencies(5.0),
82            )
83        });
84
85    /// The latency of contains keys
86    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
87        LazyLock::new(|| {
88            register_histogram_vec(
89                "key_value_store_view_contains_keys_latency",
90                "KeyValueStoreView contains keys latency",
91                &[],
92                exponential_bucket_latencies(5.0),
93            )
94        });
95
96    /// The latency of find keys by prefix operation
97    pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
98        LazyLock::new(|| {
99            register_histogram_vec(
100                "key_value_store_view_find_keys_by_prefix_latency",
101                "KeyValueStoreView find keys by prefix latency",
102                &[],
103                exponential_bucket_latencies(5.0),
104            )
105        });
106
107    /// The latency of find key values by prefix operation
108    pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
109        LazyLock::new(|| {
110            register_histogram_vec(
111                "key_value_store_view_find_key_values_by_prefix_latency",
112                "KeyValueStoreView find key values by prefix latency",
113                &[],
114                exponential_bucket_latencies(5.0),
115            )
116        });
117
118    /// The latency of write batch operation
119    pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
120        LazyLock::new(|| {
121            register_histogram_vec(
122                "key_value_store_view_write_batch_latency",
123                "KeyValueStoreView write batch latency",
124                &[],
125                exponential_bucket_latencies(5.0),
126            )
127        });
128}
129
130#[cfg(with_testing)]
131use {
132    crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
133    async_lock::RwLock,
134    std::sync::Arc,
135    thiserror::Error,
136};
137
138#[repr(u8)]
139enum KeyTag {
140    /// Prefix for the indices of the view.
141    Index = MIN_VIEW_TAG,
142    /// Prefix for the hash.
143    Hash,
144}
145
146/// A view that represents the functions of `KeyValueStore`.
147///
148/// Comment on the data set:
149/// In order to work, the view needs to store the updates and deleted prefixes.
150/// The updates and deleted prefixes have to be coherent. This means:
151/// * If an index is deleted by one in deleted prefixes then it should not be present
152///   in the updates at all.
153/// * [`DeletePrefix::key_prefix`][entry1] should not dominate anyone. That is if we have `[0,2]`
154///   then we should not have `[0,2,3]` since it would be dominated by the preceding.
155///
156/// With that we have:
157/// * in order to test if an `index` is deleted by a prefix we compute the highest deleted prefix `dp`
158///   such that `dp <= index`.
159///   If `dp` is indeed a prefix then we conclude that `index` is deleted, otherwise not.
160///   The no domination is essential here.
161///
162/// [entry1]: crate::batch::WriteOperation::DeletePrefix
163#[derive(Debug, Allocative)]
164#[allocative(bound = "C")]
165pub struct KeyValueStoreView<C> {
166    /// The view context.
167    #[allocative(skip)]
168    context: C,
169    /// Tracks deleted key prefixes.
170    deletion_set: DeletionSet,
171    /// Pending changes not yet persisted to storage.
172    updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
173    /// The hash persisted in storage.
174    #[allocative(visit = visit_allocative_simple)]
175    stored_hash: Option<HasherOutput>,
176    /// Memoized hash, if any.
177    #[allocative(visit = visit_allocative_simple)]
178    hash: Mutex<Option<HasherOutput>>,
179}
180
181impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
182    type Target = KeyValueStoreView<C2>;
183
184    async fn with_context(
185        &mut self,
186        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
187    ) -> Self::Target {
188        let hash = *self.hash.lock().unwrap();
189        KeyValueStoreView {
190            context: ctx.clone()(&self.context),
191            deletion_set: self.deletion_set.clone(),
192            updates: self.updates.clone(),
193            stored_hash: self.stored_hash,
194            hash: Mutex::new(hash),
195        }
196    }
197}
198
199impl<C: Context> View for KeyValueStoreView<C> {
200    const NUM_INIT_KEYS: usize = 1;
201
202    type Context = C;
203
204    fn context(&self) -> C {
205        self.context.clone()
206    }
207
208    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
209        let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
210        Ok(vec![key_hash])
211    }
212
213    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
214        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
215        Ok(Self {
216            context,
217            deletion_set: DeletionSet::new(),
218            updates: BTreeMap::new(),
219            stored_hash: hash,
220            hash: Mutex::new(hash),
221        })
222    }
223
224    fn rollback(&mut self) {
225        self.deletion_set.rollback();
226        self.updates.clear();
227        *self.hash.get_mut().unwrap() = self.stored_hash;
228    }
229
230    async fn has_pending_changes(&self) -> bool {
231        if self.deletion_set.has_pending_changes() {
232            return true;
233        }
234        if !self.updates.is_empty() {
235            return true;
236        }
237        let hash = self.hash.lock().unwrap();
238        self.stored_hash != *hash
239    }
240
241    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
242        let mut delete_view = false;
243        if self.deletion_set.delete_storage_first {
244            delete_view = true;
245            batch.delete_key_prefix(self.context.base_key().bytes.clone());
246            for (index, update) in self.updates.iter() {
247                if let Update::Set(value) = update {
248                    let key = self
249                        .context
250                        .base_key()
251                        .base_tag_index(KeyTag::Index as u8, index);
252                    batch.put_key_value_bytes(key, value.clone());
253                    delete_view = false;
254                }
255            }
256        } else {
257            for index in self.deletion_set.deleted_prefixes.iter() {
258                let key = self
259                    .context
260                    .base_key()
261                    .base_tag_index(KeyTag::Index as u8, index);
262                batch.delete_key_prefix(key);
263            }
264            for (index, update) in self.updates.iter() {
265                let key = self
266                    .context
267                    .base_key()
268                    .base_tag_index(KeyTag::Index as u8, index);
269                match update {
270                    Update::Removed => batch.delete_key(key),
271                    Update::Set(value) => batch.put_key_value_bytes(key, value.clone()),
272                }
273            }
274        }
275        let hash = *self.hash.lock().unwrap();
276        if self.stored_hash != hash {
277            let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
278            match hash {
279                None => batch.delete_key(key),
280                Some(hash) => batch.put_key_value(key, &hash)?,
281            }
282        }
283        Ok(delete_view)
284    }
285
286    fn post_save(&mut self) {
287        self.deletion_set.delete_storage_first = false;
288        self.deletion_set.deleted_prefixes.clear();
289        self.updates.clear();
290        let hash = *self.hash.lock().unwrap();
291        self.stored_hash = hash;
292    }
293
294    fn clear(&mut self) {
295        self.deletion_set.clear();
296        self.updates.clear();
297        *self.hash.get_mut().unwrap() = None;
298    }
299}
300
301impl<C: Context> ClonableView for KeyValueStoreView<C> {
302    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
303        Ok(KeyValueStoreView {
304            context: self.context.clone(),
305            deletion_set: self.deletion_set.clone(),
306            updates: self.updates.clone(),
307            stored_hash: self.stored_hash,
308            hash: Mutex::new(*self.hash.get_mut().unwrap()),
309        })
310    }
311}
312
313impl<C: Context> KeyValueStoreView<C> {
314    fn max_key_size(&self) -> usize {
315        let prefix_len = self.context.base_key().bytes.len();
316        <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
317    }
318
319    /// Applies the function f over all indices. If the function f returns
320    /// false, then the loop ends prematurely.
321    /// ```rust
322    /// # tokio_test::block_on(async {
323    /// # use linera_views::context::MemoryContext;
324    /// # use linera_views::key_value_store_view::KeyValueStoreView;
325    /// # use linera_views::views::View;
326    /// # let context = MemoryContext::new_for_testing(());
327    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
328    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
329    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
330    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
331    /// let mut count = 0;
332    /// view.for_each_index_while(|_key| {
333    ///     count += 1;
334    ///     Ok(count < 2)
335    /// })
336    /// .await
337    /// .unwrap();
338    /// assert_eq!(count, 2);
339    /// # })
340    /// ```
341    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
342    where
343        F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
344    {
345        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
346        let mut updates = self.updates.iter();
347        let mut update = updates.next();
348        if !self.deletion_set.delete_storage_first {
349            let mut suffix_closed_set =
350                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
351            for index in self
352                .context
353                .store()
354                .find_keys_by_prefix(&key_prefix)
355                .await?
356            {
357                loop {
358                    match update {
359                        Some((key, value)) if key <= &index => {
360                            if let Update::Set(_) = value {
361                                if !f(key)? {
362                                    return Ok(());
363                                }
364                            }
365                            update = updates.next();
366                            if key == &index {
367                                break;
368                            }
369                        }
370                        _ => {
371                            if !suffix_closed_set.find_key(&index) && !f(&index)? {
372                                return Ok(());
373                            }
374                            break;
375                        }
376                    }
377                }
378            }
379        }
380        while let Some((key, value)) = update {
381            if let Update::Set(_) = value {
382                if !f(key)? {
383                    return Ok(());
384                }
385            }
386            update = updates.next();
387        }
388        Ok(())
389    }
390
391    /// Applies the function f over all indices.
392    /// ```rust
393    /// # tokio_test::block_on(async {
394    /// # use linera_views::context::MemoryContext;
395    /// # use linera_views::key_value_store_view::KeyValueStoreView;
396    /// # use linera_views::views::View;
397    /// # let context = MemoryContext::new_for_testing(());
398    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
399    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
400    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
401    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
402    /// let mut count = 0;
403    /// view.for_each_index(|_key| {
404    ///     count += 1;
405    ///     Ok(())
406    /// })
407    /// .await
408    /// .unwrap();
409    /// assert_eq!(count, 3);
410    /// # })
411    /// ```
412    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
413    where
414        F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
415    {
416        self.for_each_index_while(|key| {
417            f(key)?;
418            Ok(true)
419        })
420        .await
421    }
422
423    /// Applies the function f over all index/value pairs.
424    /// If the function f returns false then the loop ends prematurely.
425    /// ```rust
426    /// # tokio_test::block_on(async {
427    /// # use linera_views::context::MemoryContext;
428    /// # use linera_views::key_value_store_view::KeyValueStoreView;
429    /// # use linera_views::views::View;
430    /// # let context = MemoryContext::new_for_testing(());
431    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
432    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
433    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
434    /// let mut values = Vec::new();
435    /// view.for_each_index_value_while(|_key, value| {
436    ///     values.push(value.to_vec());
437    ///     Ok(values.len() < 1)
438    /// })
439    /// .await
440    /// .unwrap();
441    /// assert_eq!(values, vec![vec![0]]);
442    /// # })
443    /// ```
444    pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
445    where
446        F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
447    {
448        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
449        let mut updates = self.updates.iter();
450        let mut update = updates.next();
451        if !self.deletion_set.delete_storage_first {
452            let mut suffix_closed_set =
453                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
454            for entry in self
455                .context
456                .store()
457                .find_key_values_by_prefix(&key_prefix)
458                .await?
459            {
460                let (index, index_val) = entry;
461                loop {
462                    match update {
463                        Some((key, value)) if key <= &index => {
464                            if let Update::Set(value) = value {
465                                if !f(key, value)? {
466                                    return Ok(());
467                                }
468                            }
469                            update = updates.next();
470                            if key == &index {
471                                break;
472                            }
473                        }
474                        _ => {
475                            if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
476                                return Ok(());
477                            }
478                            break;
479                        }
480                    }
481                }
482            }
483        }
484        while let Some((key, value)) = update {
485            if let Update::Set(value) = value {
486                if !f(key, value)? {
487                    return Ok(());
488                }
489            }
490            update = updates.next();
491        }
492        Ok(())
493    }
494
495    /// Applies the function f over all index/value pairs.
496    /// ```rust
497    /// # tokio_test::block_on(async {
498    /// # use linera_views::context::MemoryContext;
499    /// # use linera_views::key_value_store_view::KeyValueStoreView;
500    /// # use linera_views::views::View;
501    /// # let context = MemoryContext::new_for_testing(());
502    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
503    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
504    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
505    /// let mut part_keys = Vec::new();
506    /// view.for_each_index_while(|key| {
507    ///     part_keys.push(key.to_vec());
508    ///     Ok(part_keys.len() < 1)
509    /// })
510    /// .await
511    /// .unwrap();
512    /// assert_eq!(part_keys, vec![vec![0, 1]]);
513    /// # })
514    /// ```
515    pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
516    where
517        F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
518    {
519        self.for_each_index_value_while(|key, value| {
520            f(key, value)?;
521            Ok(true)
522        })
523        .await
524    }
525
526    /// Returns the list of indices in lexicographic order.
527    /// ```rust
528    /// # tokio_test::block_on(async {
529    /// # use linera_views::context::MemoryContext;
530    /// # use linera_views::key_value_store_view::KeyValueStoreView;
531    /// # use linera_views::views::View;
532    /// # let context = MemoryContext::new_for_testing(());
533    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
534    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
535    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
536    /// let indices = view.indices().await.unwrap();
537    /// assert_eq!(indices, vec![vec![0, 1], vec![0, 2]]);
538    /// # })
539    /// ```
540    pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
541        let mut indices = Vec::new();
542        self.for_each_index(|index| {
543            indices.push(index.to_vec());
544            Ok(())
545        })
546        .await?;
547        Ok(indices)
548    }
549
550    /// Returns the list of indices and values in lexicographic order.
551    /// ```rust
552    /// # tokio_test::block_on(async {
553    /// # use linera_views::context::MemoryContext;
554    /// # use linera_views::key_value_store_view::KeyValueStoreView;
555    /// # use linera_views::views::View;
556    /// # let context = MemoryContext::new_for_testing(());
557    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
558    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
559    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
560    /// let key_values = view.indices().await.unwrap();
561    /// assert_eq!(key_values, vec![vec![0, 1], vec![0, 2]]);
562    /// # })
563    /// ```
564    pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
565        let mut index_values = Vec::new();
566        self.for_each_index_value(|index, value| {
567            index_values.push((index.to_vec(), value.to_vec()));
568            Ok(())
569        })
570        .await?;
571        Ok(index_values)
572    }
573
574    /// Returns the number of entries.
575    /// ```rust
576    /// # tokio_test::block_on(async {
577    /// # use linera_views::context::MemoryContext;
578    /// # use linera_views::key_value_store_view::KeyValueStoreView;
579    /// # use linera_views::views::View;
580    /// # let context = MemoryContext::new_for_testing(());
581    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
582    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
583    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
584    /// let count = view.count().await.unwrap();
585    /// assert_eq!(count, 2);
586    /// # })
587    /// ```
588    pub async fn count(&self) -> Result<usize, ViewError> {
589        let mut count = 0;
590        self.for_each_index(|_index| {
591            count += 1;
592            Ok(())
593        })
594        .await?;
595        Ok(count)
596    }
597
598    /// Obtains the value at the given index, if any.
599    /// ```rust
600    /// # tokio_test::block_on(async {
601    /// # use linera_views::context::MemoryContext;
602    /// # use linera_views::key_value_store_view::KeyValueStoreView;
603    /// # use linera_views::views::View;
604    /// # let context = MemoryContext::new_for_testing(());
605    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
606    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
607    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![42]));
608    /// assert_eq!(view.get(&[0, 2]).await.unwrap(), None);
609    /// # })
610    /// ```
611    pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
612        #[cfg(with_metrics)]
613        let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
614        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
615        if let Some(update) = self.updates.get(index) {
616            let value = match update {
617                Update::Removed => None,
618                Update::Set(value) => Some(value.clone()),
619            };
620            return Ok(value);
621        }
622        if self.deletion_set.contains_prefix_of(index) {
623            return Ok(None);
624        }
625        let key = self
626            .context
627            .base_key()
628            .base_tag_index(KeyTag::Index as u8, index);
629        Ok(self.context.store().read_value_bytes(&key).await?)
630    }
631
632    /// Tests whether the store contains a specific index.
633    /// ```rust
634    /// # tokio_test::block_on(async {
635    /// # use linera_views::context::MemoryContext;
636    /// # use linera_views::key_value_store_view::KeyValueStoreView;
637    /// # use linera_views::views::View;
638    /// # let context = MemoryContext::new_for_testing(());
639    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
640    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
641    /// assert!(view.contains_key(&[0, 1]).await.unwrap());
642    /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
643    /// # })
644    /// ```
645    pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
646        #[cfg(with_metrics)]
647        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
648        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
649        if let Some(update) = self.updates.get(index) {
650            let test = match update {
651                Update::Removed => false,
652                Update::Set(_value) => true,
653            };
654            return Ok(test);
655        }
656        if self.deletion_set.contains_prefix_of(index) {
657            return Ok(false);
658        }
659        let key = self
660            .context
661            .base_key()
662            .base_tag_index(KeyTag::Index as u8, index);
663        Ok(self.context.store().contains_key(&key).await?)
664    }
665
666    /// Tests whether the view contains a range of indices
667    /// ```rust
668    /// # tokio_test::block_on(async {
669    /// # use linera_views::context::MemoryContext;
670    /// # use linera_views::key_value_store_view::KeyValueStoreView;
671    /// # use linera_views::views::View;
672    /// # let context = MemoryContext::new_for_testing(());
673    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
674    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
675    /// let keys = vec![vec![0, 1], vec![0, 2]];
676    /// let results = view.contains_keys(&keys).await.unwrap();
677    /// assert_eq!(results, vec![true, false]);
678    /// # })
679    /// ```
680    pub async fn contains_keys(&self, indices: &[Vec<u8>]) -> Result<Vec<bool>, ViewError> {
681        #[cfg(with_metrics)]
682        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
683        let mut results = Vec::with_capacity(indices.len());
684        let mut missed_indices = Vec::new();
685        let mut vector_query = Vec::new();
686        for (i, index) in indices.iter().enumerate() {
687            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
688            if let Some(update) = self.updates.get(index) {
689                let value = match update {
690                    Update::Removed => false,
691                    Update::Set(_) => true,
692                };
693                results.push(value);
694            } else {
695                results.push(false);
696                if !self.deletion_set.contains_prefix_of(index) {
697                    missed_indices.push(i);
698                    let key = self
699                        .context
700                        .base_key()
701                        .base_tag_index(KeyTag::Index as u8, index);
702                    vector_query.push(key);
703                }
704            }
705        }
706        let values = self.context.store().contains_keys(&vector_query).await?;
707        for (i, value) in missed_indices.into_iter().zip(values) {
708            results[i] = value;
709        }
710        Ok(results)
711    }
712
713    /// Obtains the values of a range of indices
714    /// ```rust
715    /// # tokio_test::block_on(async {
716    /// # use linera_views::context::MemoryContext;
717    /// # use linera_views::key_value_store_view::KeyValueStoreView;
718    /// # use linera_views::views::View;
719    /// # let context = MemoryContext::new_for_testing(());
720    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
721    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
722    /// assert_eq!(
723    ///     view.multi_get(&[vec![0, 1], vec![0, 2]]).await.unwrap(),
724    ///     vec![Some(vec![42]), None]
725    /// );
726    /// # })
727    /// ```
728    pub async fn multi_get(&self, indices: &[Vec<u8>]) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
729        #[cfg(with_metrics)]
730        let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
731        let mut result = Vec::with_capacity(indices.len());
732        let mut missed_indices = Vec::new();
733        let mut vector_query = Vec::new();
734        for (i, index) in indices.iter().enumerate() {
735            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
736            if let Some(update) = self.updates.get(index) {
737                let value = match update {
738                    Update::Removed => None,
739                    Update::Set(value) => Some(value.clone()),
740                };
741                result.push(value);
742            } else {
743                result.push(None);
744                if !self.deletion_set.contains_prefix_of(index) {
745                    missed_indices.push(i);
746                    let key = self
747                        .context
748                        .base_key()
749                        .base_tag_index(KeyTag::Index as u8, index);
750                    vector_query.push(key);
751                }
752            }
753        }
754        let values = self
755            .context
756            .store()
757            .read_multi_values_bytes(&vector_query)
758            .await?;
759        for (i, value) in missed_indices.into_iter().zip(values) {
760            result[i] = value;
761        }
762        Ok(result)
763    }
764
765    /// Applies the given batch of `crate::common::WriteOperation`.
766    /// ```rust
767    /// # tokio_test::block_on(async {
768    /// # use linera_views::context::MemoryContext;
769    /// # use linera_views::key_value_store_view::KeyValueStoreView;
770    /// # use linera_views::batch::Batch;
771    /// # use linera_views::views::View;
772    /// # let context = MemoryContext::new_for_testing(());
773    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
774    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
775    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
776    /// let mut batch = Batch::new();
777    /// batch.delete_key_prefix(vec![0]);
778    /// view.write_batch(batch).unwrap();
779    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
780    /// assert_eq!(key_values, vec![]);
781    /// # })
782    /// ```
783    pub fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
784        #[cfg(with_metrics)]
785        let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
786        *self.hash.get_mut().unwrap() = None;
787        let max_key_size = self.max_key_size();
788        for operation in batch.operations {
789            match operation {
790                WriteOperation::Delete { key } => {
791                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
792                    if self.deletion_set.contains_prefix_of(&key) {
793                        // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
794                        self.updates.remove(&key);
795                    } else {
796                        self.updates.insert(key, Update::Removed);
797                    }
798                }
799                WriteOperation::Put { key, value } => {
800                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
801                    self.updates.insert(key, Update::Set(value));
802                }
803                WriteOperation::DeletePrefix { key_prefix } => {
804                    ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
805                    let key_list = self
806                        .updates
807                        .range(get_key_range_for_prefix(key_prefix.clone()))
808                        .map(|x| x.0.to_vec())
809                        .collect::<Vec<_>>();
810                    for key in key_list {
811                        self.updates.remove(&key);
812                    }
813                    self.deletion_set.insert_key_prefix(key_prefix);
814                }
815            }
816        }
817        Ok(())
818    }
819
820    /// Sets or inserts a value.
821    /// ```rust
822    /// # tokio_test::block_on(async {
823    /// # use linera_views::context::MemoryContext;
824    /// # use linera_views::key_value_store_view::KeyValueStoreView;
825    /// # use linera_views::views::View;
826    /// # let context = MemoryContext::new_for_testing(());
827    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
828    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
829    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![34]));
830    /// # })
831    /// ```
832    pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
833        let mut batch = Batch::new();
834        batch.put_key_value_bytes(index, value);
835        self.write_batch(batch)
836    }
837
838    /// Removes a value. If absent then the action has no effect.
839    /// ```rust
840    /// # tokio_test::block_on(async {
841    /// # use linera_views::context::MemoryContext;
842    /// # use linera_views::key_value_store_view::KeyValueStoreView;
843    /// # use linera_views::views::View;
844    /// # let context = MemoryContext::new_for_testing(());
845    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
846    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
847    /// view.remove(vec![0, 1]).await.unwrap();
848    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
849    /// # })
850    /// ```
851    pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
852        let mut batch = Batch::new();
853        batch.delete_key(index);
854        self.write_batch(batch)
855    }
856
857    /// Deletes a key prefix.
858    /// ```rust
859    /// # tokio_test::block_on(async {
860    /// # use linera_views::context::MemoryContext;
861    /// # use linera_views::key_value_store_view::KeyValueStoreView;
862    /// # use linera_views::views::View;
863    /// # let context = MemoryContext::new_for_testing(());
864    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
865    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
866    /// view.remove_by_prefix(vec![0]).await.unwrap();
867    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
868    /// # })
869    /// ```
870    pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
871        let mut batch = Batch::new();
872        batch.delete_key_prefix(key_prefix);
873        self.write_batch(batch)
874    }
875
876    /// Iterates over all the keys matching the given prefix. The prefix is not included in the returned keys.
877    /// ```rust
878    /// # tokio_test::block_on(async {
879    /// # use linera_views::context::MemoryContext;
880    /// # use linera_views::key_value_store_view::KeyValueStoreView;
881    /// # use linera_views::views::View;
882    /// # let context = MemoryContext::new_for_testing(());
883    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
884    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
885    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
886    /// let keys = view.find_keys_by_prefix(&[0]).await.unwrap();
887    /// assert_eq!(keys, vec![vec![1]]);
888    /// # })
889    /// ```
890    pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
891        #[cfg(with_metrics)]
892        let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
893        ensure!(
894            key_prefix.len() <= self.max_key_size(),
895            ViewError::KeyTooLong
896        );
897        let len = key_prefix.len();
898        let key_prefix_full = self
899            .context
900            .base_key()
901            .base_tag_index(KeyTag::Index as u8, key_prefix);
902        let mut keys = Vec::new();
903        let key_prefix_upper = get_upper_bound(key_prefix);
904        let mut updates = self
905            .updates
906            .range((Included(key_prefix.to_vec()), key_prefix_upper));
907        let mut update = updates.next();
908        if !self.deletion_set.delete_storage_first {
909            let mut suffix_closed_set =
910                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
911            for key in self
912                .context
913                .store()
914                .find_keys_by_prefix(&key_prefix_full)
915                .await?
916            {
917                loop {
918                    match update {
919                        Some((update_key, update_value))
920                            if &update_key[len..] <= key.as_slice() =>
921                        {
922                            if let Update::Set(_) = update_value {
923                                keys.push(update_key[len..].to_vec());
924                            }
925                            update = updates.next();
926                            if update_key[len..] == key[..] {
927                                break;
928                            }
929                        }
930                        _ => {
931                            let mut key_with_prefix = key_prefix.to_vec();
932                            key_with_prefix.extend_from_slice(&key);
933                            if !suffix_closed_set.find_key(&key_with_prefix) {
934                                keys.push(key);
935                            }
936                            break;
937                        }
938                    }
939                }
940            }
941        }
942        while let Some((update_key, update_value)) = update {
943            if let Update::Set(_) = update_value {
944                let update_key = update_key[len..].to_vec();
945                keys.push(update_key);
946            }
947            update = updates.next();
948        }
949        Ok(keys)
950    }
951
952    /// Iterates over all the key-value pairs, for keys matching the given prefix. The
953    /// prefix is not included in the returned keys.
954    /// ```rust
955    /// # tokio_test::block_on(async {
956    /// # use linera_views::context::MemoryContext;
957    /// # use linera_views::key_value_store_view::KeyValueStoreView;
958    /// # use linera_views::views::View;
959    /// # let context = MemoryContext::new_for_testing(());
960    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
961    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
962    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
963    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
964    /// assert_eq!(key_values, vec![(vec![1], vec![34])]);
965    /// # })
966    /// ```
967    pub async fn find_key_values_by_prefix(
968        &self,
969        key_prefix: &[u8],
970    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
971        #[cfg(with_metrics)]
972        let _latency =
973            metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
974        ensure!(
975            key_prefix.len() <= self.max_key_size(),
976            ViewError::KeyTooLong
977        );
978        let len = key_prefix.len();
979        let key_prefix_full = self
980            .context
981            .base_key()
982            .base_tag_index(KeyTag::Index as u8, key_prefix);
983        let mut key_values = Vec::new();
984        let key_prefix_upper = get_upper_bound(key_prefix);
985        let mut updates = self
986            .updates
987            .range((Included(key_prefix.to_vec()), key_prefix_upper));
988        let mut update = updates.next();
989        if !self.deletion_set.delete_storage_first {
990            let mut suffix_closed_set =
991                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
992            for entry in self
993                .context
994                .store()
995                .find_key_values_by_prefix(&key_prefix_full)
996                .await?
997            {
998                let (key, value) = entry;
999                loop {
1000                    match update {
1001                        Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1002                            if let Update::Set(update_value) = update_value {
1003                                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1004                                key_values.push(key_value);
1005                            }
1006                            update = updates.next();
1007                            if update_key[len..] == key[..] {
1008                                break;
1009                            }
1010                        }
1011                        _ => {
1012                            let mut key_with_prefix = key_prefix.to_vec();
1013                            key_with_prefix.extend_from_slice(&key);
1014                            if !suffix_closed_set.find_key(&key_with_prefix) {
1015                                key_values.push((key, value));
1016                            }
1017                            break;
1018                        }
1019                    }
1020                }
1021            }
1022        }
1023        while let Some((update_key, update_value)) = update {
1024            if let Update::Set(update_value) = update_value {
1025                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1026                key_values.push(key_value);
1027            }
1028            update = updates.next();
1029        }
1030        Ok(key_values)
1031    }
1032
1033    async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1034        #[cfg(with_metrics)]
1035        let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1036        let mut hasher = sha3::Sha3_256::default();
1037        let mut count = 0u32;
1038        self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1039            count += 1;
1040            hasher.update_with_bytes(index)?;
1041            hasher.update_with_bytes(value)?;
1042            Ok(())
1043        })
1044        .await?;
1045        hasher.update_with_bcs_bytes(&count)?;
1046        Ok(hasher.finalize())
1047    }
1048}
1049
1050impl<C: Context> HashableView for KeyValueStoreView<C> {
1051    type Hasher = sha3::Sha3_256;
1052
1053    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1054        let hash = *self.hash.get_mut().unwrap();
1055        match hash {
1056            Some(hash) => Ok(hash),
1057            None => {
1058                let new_hash = self.compute_hash().await?;
1059                let hash = self.hash.get_mut().unwrap();
1060                *hash = Some(new_hash);
1061                Ok(new_hash)
1062            }
1063        }
1064    }
1065
1066    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1067        let hash = *self.hash.lock().unwrap();
1068        match hash {
1069            Some(hash) => Ok(hash),
1070            None => {
1071                let new_hash = self.compute_hash().await?;
1072                let mut hash = self.hash.lock().unwrap();
1073                *hash = Some(new_hash);
1074                Ok(new_hash)
1075            }
1076        }
1077    }
1078}
1079
1080/// Type wrapping `KeyValueStoreView` while memoizing the hash.
1081pub type HashedKeyValueStoreView<C> =
1082    WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1083
1084/// Wrapper around `KeyValueStoreView` to compute hashes based on the history of changes.
1085pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1086
1087/// A virtual DB client using a `KeyValueStoreView` as a backend (testing only).
1088#[cfg(with_testing)]
1089#[derive(Debug, Clone)]
1090pub struct ViewContainer<C> {
1091    view: Arc<RwLock<KeyValueStoreView<C>>>,
1092}
1093
1094#[cfg(with_testing)]
1095impl<C> WithError for ViewContainer<C> {
1096    type Error = ViewContainerError;
1097}
1098
1099#[cfg(with_testing)]
1100/// The error type for [`ViewContainer`] operations.
1101#[derive(Error, Debug)]
1102pub enum ViewContainerError {
1103    /// View error.
1104    #[error(transparent)]
1105    ViewError(#[from] ViewError),
1106
1107    /// BCS serialization error.
1108    #[error(transparent)]
1109    BcsError(#[from] bcs::Error),
1110}
1111
1112#[cfg(with_testing)]
1113impl KeyValueStoreError for ViewContainerError {
1114    const BACKEND: &'static str = "view_container";
1115}
1116
1117#[cfg(with_testing)]
1118impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1119    const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1120
1121    fn max_stream_queries(&self) -> usize {
1122        1
1123    }
1124
1125    fn root_key(&self) -> Result<Vec<u8>, ViewContainerError> {
1126        Ok(Vec::new())
1127    }
1128
1129    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1130        let view = self.view.read().await;
1131        Ok(view.get(key).await?)
1132    }
1133
1134    async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1135        let view = self.view.read().await;
1136        Ok(view.contains_key(key).await?)
1137    }
1138
1139    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, ViewContainerError> {
1140        let view = self.view.read().await;
1141        Ok(view.contains_keys(keys).await?)
1142    }
1143
1144    async fn read_multi_values_bytes(
1145        &self,
1146        keys: &[Vec<u8>],
1147    ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1148        let view = self.view.read().await;
1149        Ok(view.multi_get(keys).await?)
1150    }
1151
1152    async fn find_keys_by_prefix(
1153        &self,
1154        key_prefix: &[u8],
1155    ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1156        let view = self.view.read().await;
1157        Ok(view.find_keys_by_prefix(key_prefix).await?)
1158    }
1159
1160    async fn find_key_values_by_prefix(
1161        &self,
1162        key_prefix: &[u8],
1163    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1164        let view = self.view.read().await;
1165        Ok(view.find_key_values_by_prefix(key_prefix).await?)
1166    }
1167}
1168
1169#[cfg(with_testing)]
1170impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1171    const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1172
1173    async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1174        let mut view = self.view.write().await;
1175        view.write_batch(batch)?;
1176        let mut batch = Batch::new();
1177        view.pre_save(&mut batch)?;
1178        view.post_save();
1179        view.context()
1180            .store()
1181            .write_batch(batch)
1182            .await
1183            .map_err(ViewError::from)?;
1184        Ok(())
1185    }
1186
1187    async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1188        Ok(())
1189    }
1190}
1191
1192#[cfg(with_testing)]
1193impl<C: Context> ViewContainer<C> {
1194    /// Creates a [`ViewContainer`].
1195    pub async fn new(context: C) -> Result<Self, ViewError> {
1196        let view = KeyValueStoreView::load(context).await?;
1197        let view = Arc::new(RwLock::new(view));
1198        Ok(Self { view })
1199    }
1200}