linera_views/views/collection_view.rs
1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Borrow,
6    collections::{btree_map, BTreeMap},
7    io::Write,
8    marker::PhantomData,
9    mem,
10};
11
12use async_lock::{RwLock, RwLockReadGuard};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use serde::{de::DeserializeOwned, Serialize};
16
17use crate::{
18    batch::Batch,
19    common::{CustomSerialize, HasherOutput, SliceExt as _, Update},
20    context::{BaseKey, Context},
21    hashable_wrapper::WrappedHashableContainerView,
22    historical_hash_wrapper::HistoricallyHashableView,
23    store::ReadableKeyValueStore as _,
24    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
25};
26
27#[cfg(with_metrics)]
28mod metrics {
29    use std::sync::LazyLock;
30
31    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
32    use prometheus::HistogramVec;
33
34    /// The runtime of hash computation
35    pub static COLLECTION_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
36        register_histogram_vec(
37            "collection_view_hash_runtime",
38            "CollectionView hash runtime",
39            &[],
40            exponential_bucket_latencies(5.0),
41        )
42    });
43}
44
45/// A view that supports accessing a collection of views of the same kind, indexed by a
46/// `Vec<u8>`, one subview at a time.
47#[derive(Debug)]
48pub struct ByteCollectionView<C, W> {
49    context: C,
50    delete_storage_first: bool,
51    updates: RwLock<BTreeMap<Vec<u8>, Update<W>>>,
52}
53
54/// A read-only accessor for a particular subview in a [`CollectionView`].
55pub enum ReadGuardedView<'a, W> {
56    /// The view is loaded in the updates
57    Loaded {
58        /// The guard for the updates.
59        updates: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Update<W>>>,
60        /// The key in question.
61        short_key: Vec<u8>,
62    },
63    /// The view is not loaded in the updates
64    NotLoaded {
65        /// The guard for the updates. It is needed so that it prevents
66        /// opening the view as write separately.
67        _updates: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Update<W>>>,
68        /// The view obtained from the storage
69        view: W,
70    },
71}
72
73impl<W> std::ops::Deref for ReadGuardedView<'_, W> {
74    type Target = W;
75
76    fn deref(&self) -> &W {
77        match self {
78            ReadGuardedView::Loaded { updates, short_key } => {
79                let Update::Set(view) = updates.get(short_key).unwrap() else {
80                    unreachable!();
81                };
82                view
83            }
84            ReadGuardedView::NotLoaded { _updates, view } => view,
85        }
86    }
87}
88
89/// We need to find new base keys in order to implement `CollectionView`.
90/// We do this by appending a value to the base key.
91///
92/// Sub-views in a collection share a common key prefix, like in other view types. However,
93/// just concatenating the shared prefix with sub-view keys makes it impossible to distinguish if a
94/// given key belongs to child sub-view or a grandchild sub-view (consider for example if a
95/// collection is stored inside the collection).
96#[repr(u8)]
97enum KeyTag {
98    /// Prefix for specifying an index and serves to indicate the existence of an entry in the collection.
99    Index = MIN_VIEW_TAG,
100    /// Prefix for specifying as the prefix for the sub-view.
101    Subview,
102}
103
104impl<W: View> View for ByteCollectionView<W::Context, W> {
105    const NUM_INIT_KEYS: usize = 0;
106
107    type Context = W::Context;
108
109    fn context(&self) -> &Self::Context {
110        &self.context
111    }
112
113    fn pre_load(_context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
114        Ok(vec![])
115    }
116
117    fn post_load(context: Self::Context, _values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
118        Ok(Self {
119            context,
120            delete_storage_first: false,
121            updates: RwLock::new(BTreeMap::new()),
122        })
123    }
124
125    fn rollback(&mut self) {
126        self.delete_storage_first = false;
127        self.updates.get_mut().clear();
128    }
129
130    async fn has_pending_changes(&self) -> bool {
131        if self.delete_storage_first {
132            return true;
133        }
134        let updates = self.updates.read().await;
135        !updates.is_empty()
136    }
137
138    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
139        let mut delete_view = false;
140        if self.delete_storage_first {
141            delete_view = true;
142            batch.delete_key_prefix(self.context.base_key().bytes.clone());
143            for (index, update) in mem::take(self.updates.get_mut()) {
144                if let Update::Set(mut view) = update {
145                    view.flush(batch)?;
146                    self.add_index(batch, &index);
147                    delete_view = false;
148                }
149            }
150        } else {
151            for (index, update) in mem::take(self.updates.get_mut()) {
152                match update {
153                    Update::Set(mut view) => {
154                        view.flush(batch)?;
155                        self.add_index(batch, &index);
156                    }
157                    Update::Removed => {
158                        let key_subview = self.get_subview_key(&index);
159                        let key_index = self.get_index_key(&index);
160                        batch.delete_key(key_index);
161                        batch.delete_key_prefix(key_subview);
162                    }
163                }
164            }
165        }
166        self.delete_storage_first = false;
167        Ok(delete_view)
168    }
169
170    fn clear(&mut self) {
171        self.delete_storage_first = true;
172        self.updates.get_mut().clear();
173    }
174}
175
176impl<W: ClonableView> ClonableView for ByteCollectionView<W::Context, W> {
177    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
178        let cloned_updates = self
179            .updates
180            .get_mut()
181            .iter_mut()
182            .map(|(key, value)| {
183                let cloned_value: Result<_, ViewError> = match value {
184                    Update::Removed => Ok(Update::Removed),
185                    Update::Set(view) => Ok(Update::Set(view.clone_unchecked()?)),
186                };
187                cloned_value.map(|v| (key.clone(), v))
188            })
189            .collect::<Result<_, ViewError>>()?;
190
191        Ok(ByteCollectionView {
192            context: self.context.clone(),
193            delete_storage_first: self.delete_storage_first,
194            updates: RwLock::new(cloned_updates),
195        })
196    }
197}
198
199impl<W: View> ByteCollectionView<W::Context, W> {
200    fn get_index_key(&self, index: &[u8]) -> Vec<u8> {
201        self.context
202            .base_key()
203            .base_tag_index(KeyTag::Index as u8, index)
204    }
205
206    fn get_subview_key(&self, index: &[u8]) -> Vec<u8> {
207        self.context
208            .base_key()
209            .base_tag_index(KeyTag::Subview as u8, index)
210    }
211
212    fn add_index(&self, batch: &mut Batch, index: &[u8]) {
213        let key = self.get_index_key(index);
214        batch.put_key_value_bytes(key, vec![]);
215    }
216
217    /// Loads a subview for the data at the given index in the collection. If an entry
218    /// is absent then a default entry is added to the collection. The resulting view
219    /// can be modified.
220    /// ```rust
221    /// # tokio_test::block_on(async {
222    /// # use linera_views::context::MemoryContext;
223    /// # use linera_views::collection_view::ByteCollectionView;
224    /// # use linera_views::register_view::RegisterView;
225    /// # use linera_views::views::View;
226    /// # let context = MemoryContext::new_for_testing(());
227    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
228    ///     ByteCollectionView::load(context).await.unwrap();
229    /// let subview = view.load_entry_mut(&[0, 1]).await.unwrap();
230    /// let value = subview.get();
231    /// assert_eq!(*value, String::default());
232    /// # })
233    /// ```
234    pub async fn load_entry_mut(&mut self, short_key: &[u8]) -> Result<&mut W, ViewError> {
235        match self.updates.get_mut().entry(short_key.to_vec()) {
236            btree_map::Entry::Occupied(entry) => {
237                let entry = entry.into_mut();
238                match entry {
239                    Update::Set(view) => Ok(view),
240                    Update::Removed => {
241                        let key = self
242                            .context
243                            .base_key()
244                            .base_tag_index(KeyTag::Subview as u8, short_key);
245                        let context = self.context.clone_with_base_key(key);
246                        // Obtain a view and set its pending state to the default (e.g. empty) state
247                        let view = W::new(context)?;
248                        *entry = Update::Set(view);
249                        let Update::Set(view) = entry else {
250                            unreachable!();
251                        };
252                        Ok(view)
253                    }
254                }
255            }
256            btree_map::Entry::Vacant(entry) => {
257                let key = self
258                    .context
259                    .base_key()
260                    .base_tag_index(KeyTag::Subview as u8, short_key);
261                let context = self.context.clone_with_base_key(key);
262                let view = if self.delete_storage_first {
263                    W::new(context)?
264                } else {
265                    W::load(context).await?
266                };
267                let Update::Set(view) = entry.insert(Update::Set(view)) else {
268                    unreachable!();
269                };
270                Ok(view)
271            }
272        }
273    }
274
275    /// Loads a subview for the data at the given index in the collection. If an entry
276    /// is absent then `None` is returned. The resulting view cannot be modified.
277    /// May fail if one subview is already being visited.
278    /// ```rust
279    /// # tokio_test::block_on(async {
280    /// # use linera_views::context::MemoryContext;
281    /// # use linera_views::collection_view::ByteCollectionView;
282    /// # use linera_views::register_view::RegisterView;
283    /// # use linera_views::views::View;
284    /// # let context = MemoryContext::new_for_testing(());
285    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
286    ///     ByteCollectionView::load(context).await.unwrap();
287    /// {
288    ///     let _subview = view.load_entry_mut(&[0, 1]).await.unwrap();
289    /// }
290    /// {
291    ///     let subview = view.try_load_entry(&[0, 1]).await.unwrap().unwrap();
292    ///     let value = subview.get();
293    ///     assert_eq!(*value, String::default());
294    /// }
295    /// assert!(view.try_load_entry(&[0, 2]).await.unwrap().is_none());
296    /// # })
297    /// ```
298    pub async fn try_load_entry(
299        &self,
300        short_key: &[u8],
301    ) -> Result<Option<ReadGuardedView<W>>, ViewError> {
302        let updates = self.updates.read().await;
303        match updates.get(short_key) {
304            Some(update) => match update {
305                Update::Removed => Ok(None),
306                Update::Set(_) => Ok(Some(ReadGuardedView::Loaded {
307                    updates,
308                    short_key: short_key.to_vec(),
309                })),
310            },
311            None => {
312                let key_index = self
313                    .context
314                    .base_key()
315                    .base_tag_index(KeyTag::Index as u8, short_key);
316                if !self.delete_storage_first
317                    && self.context.store().contains_key(&key_index).await?
318                {
319                    let key = self
320                        .context
321                        .base_key()
322                        .base_tag_index(KeyTag::Subview as u8, short_key);
323                    let context = self.context.clone_with_base_key(key);
324                    let view = W::load(context).await?;
325                    Ok(Some(ReadGuardedView::NotLoaded {
326                        _updates: updates,
327                        view,
328                    }))
329                } else {
330                    Ok(None)
331                }
332            }
333        }
334    }
335
336    /// Load multiple entries for reading at once.
337    /// The entries in `short_keys` have to be all distinct.
338    /// ```rust
339    /// # tokio_test::block_on(async {
340    /// # use linera_views::context::MemoryContext;
341    /// # use linera_views::collection_view::ByteCollectionView;
342    /// # use linera_views::register_view::RegisterView;
343    /// # use linera_views::views::View;
344    /// # let context = MemoryContext::new_for_testing(());
345    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
346    ///     ByteCollectionView::load(context).await.unwrap();
347    /// {
348    ///     let _subview = view.load_entry_mut(&[0, 1]).await.unwrap();
349    /// }
350    /// let short_keys = vec![vec![0, 1], vec![2, 3]];
351    /// let subviews = view.try_load_entries(short_keys).await.unwrap();
352    /// let value0 = subviews[0].as_ref().unwrap().get();
353    /// assert_eq!(*value0, String::default());
354    /// # })
355    /// ```
356    pub async fn try_load_entries(
357        &self,
358        short_keys: Vec<Vec<u8>>,
359    ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError> {
360        let mut results = Vec::with_capacity(short_keys.len());
361        let mut keys_to_check = Vec::new();
362        let mut keys_to_check_metadata = Vec::new();
363        let updates = self.updates.read().await;
364
365        for (position, short_key) in short_keys.into_iter().enumerate() {
366            match updates.get(&short_key) {
367                Some(update) => match update {
368                    Update::Removed => {
369                        results.push(None);
370                    }
371                    Update::Set(_) => {
372                        let updates = self.updates.read().await;
373                        results.push(Some(ReadGuardedView::Loaded {
374                            updates,
375                            short_key: short_key.clone(),
376                        }));
377                    }
378                },
379                None => {
380                    results.push(None); // Placeholder, may be updated later
381                    if !self.delete_storage_first {
382                        let key = self
383                            .context
384                            .base_key()
385                            .base_tag_index(KeyTag::Subview as u8, &short_key);
386                        let subview_context = self.context.clone_with_base_key(key);
387                        let key = self
388                            .context
389                            .base_key()
390                            .base_tag_index(KeyTag::Index as u8, &short_key);
391                        keys_to_check.push(key);
392                        keys_to_check_metadata.push((position, subview_context));
393                    }
394                }
395            }
396        }
397
398        let found_keys = self.context.store().contains_keys(keys_to_check).await?;
399        let entries_to_load = keys_to_check_metadata
400            .into_iter()
401            .zip(found_keys)
402            .filter_map(|(metadata, found)| found.then_some(metadata))
403            .collect::<Vec<_>>();
404
405        let mut keys_to_load = Vec::with_capacity(entries_to_load.len() * W::NUM_INIT_KEYS);
406        for (_, context) in &entries_to_load {
407            keys_to_load.extend(W::pre_load(context)?);
408        }
409        let values = self
410            .context
411            .store()
412            .read_multi_values_bytes(keys_to_load)
413            .await?;
414
415        for (loaded_values, (position, context)) in values
416            .chunks_exact_or_repeat(W::NUM_INIT_KEYS)
417            .zip(entries_to_load)
418        {
419            let view = W::post_load(context, loaded_values)?;
420            let updates = self.updates.read().await;
421            results[position] = Some(ReadGuardedView::NotLoaded {
422                _updates: updates,
423                view,
424            });
425        }
426
427        Ok(results)
428    }
429
430    /// Loads multiple entries for reading at once with their keys.
431    /// ```rust
432    /// # tokio_test::block_on(async {
433    /// # use linera_views::context::MemoryContext;
434    /// # use linera_views::collection_view::ByteCollectionView;
435    /// # use linera_views::register_view::RegisterView;
436    /// # use linera_views::views::View;
437    /// # let context = MemoryContext::new_for_testing(());
438    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
439    ///     ByteCollectionView::load(context).await.unwrap();
440    /// {
441    ///     let subview = view.load_entry_mut(&vec![0, 1]).await.unwrap();
442    ///     subview.set("Bonjour".into());
443    /// }
444    /// let short_keys = vec![vec![0, 1], vec![0, 2]];
445    /// let pairs = view.try_load_entries_pairs(short_keys).await.unwrap();
446    /// assert_eq!(pairs[0].0, vec![0, 1]);
447    /// assert_eq!(pairs[1].0, vec![0, 2]);
448    /// let value0 = pairs[0].1.as_ref().unwrap().get();
449    /// assert_eq!(*value0, "Bonjour".to_string());
450    /// assert!(pairs[1].1.is_none());
451    /// # })
452    /// ```
453    pub async fn try_load_entries_pairs(
454        &self,
455        short_keys: Vec<Vec<u8>>,
456    ) -> Result<Vec<(Vec<u8>, Option<ReadGuardedView<W>>)>, ViewError> {
457        let values = self.try_load_entries(short_keys.clone()).await?;
458        Ok(short_keys.into_iter().zip(values).collect())
459    }
460
461    /// Load all entries for reading at once.
462    /// ```rust
463    /// # tokio_test::block_on(async {
464    /// # use linera_views::context::MemoryContext;
465    /// # use linera_views::collection_view::ByteCollectionView;
466    /// # use linera_views::register_view::RegisterView;
467    /// # use linera_views::views::View;
468    /// # let context = MemoryContext::new_for_testing(());
469    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
470    ///     ByteCollectionView::load(context).await.unwrap();
471    /// {
472    ///     let _subview = view.load_entry_mut(&[0, 1]).await.unwrap();
473    /// }
474    /// let subviews = view.try_load_all_entries().await.unwrap();
475    /// assert_eq!(subviews.len(), 1);
476    /// # })
477    /// ```
478    pub async fn try_load_all_entries(
479        &self,
480    ) -> Result<Vec<(Vec<u8>, ReadGuardedView<W>)>, ViewError> {
481        let updates = self.updates.read().await; // Acquire the read lock to prevent writes.
482        let short_keys = self.keys().await?;
483        let mut results = Vec::with_capacity(short_keys.len());
484
485        let mut keys_to_load = Vec::new();
486        let mut keys_to_load_metadata = Vec::new();
487        for (position, short_key) in short_keys.iter().enumerate() {
488            match updates.get(short_key) {
489                Some(update) => {
490                    let Update::Set(_) = update else {
491                        unreachable!();
492                    };
493                    let updates = self.updates.read().await;
494                    let view = ReadGuardedView::Loaded {
495                        updates,
496                        short_key: short_key.clone(),
497                    };
498                    results.push((short_key.clone(), Some(view)));
499                }
500                None => {
501                    // If a key is not in `updates`, then it is in storage.
502                    // The key exists since otherwise it would not be in `short_keys`.
503                    // Therefore we have `self.delete_storage_first = false`.
504                    assert!(!self.delete_storage_first);
505                    results.push((short_key.clone(), None));
506                    let key = self
507                        .context
508                        .base_key()
509                        .base_tag_index(KeyTag::Subview as u8, short_key);
510                    let subview_context = self.context.clone_with_base_key(key);
511                    keys_to_load.extend(W::pre_load(&subview_context)?);
512                    keys_to_load_metadata.push((position, subview_context, short_key.clone()));
513                }
514            }
515        }
516
517        let values = self
518            .context
519            .store()
520            .read_multi_values_bytes(keys_to_load)
521            .await?;
522
523        for (loaded_values, (position, context, short_key)) in values
524            .chunks_exact_or_repeat(W::NUM_INIT_KEYS)
525            .zip(keys_to_load_metadata)
526        {
527            let view = W::post_load(context, loaded_values)?;
528            let updates = self.updates.read().await;
529            let guarded_view = ReadGuardedView::NotLoaded {
530                _updates: updates,
531                view,
532            };
533            results[position] = (short_key, Some(guarded_view));
534        }
535
536        Ok(results
537            .into_iter()
538            .map(|(short_key, view)| (short_key, view.unwrap()))
539            .collect::<Vec<_>>())
540    }
541
542    /// Resets an entry to the default value.
543    /// ```rust
544    /// # tokio_test::block_on(async {
545    /// # use linera_views::context::MemoryContext;
546    /// # use linera_views::collection_view::ByteCollectionView;
547    /// # use linera_views::register_view::RegisterView;
548    /// # use linera_views::views::View;
549    /// # let context = MemoryContext::new_for_testing(());
550    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
551    ///     ByteCollectionView::load(context).await.unwrap();
552    /// let subview = view.load_entry_mut(&[0, 1]).await.unwrap();
553    /// let value = subview.get_mut();
554    /// *value = String::from("Hello");
555    /// view.reset_entry_to_default(&[0, 1]).unwrap();
556    /// let subview = view.load_entry_mut(&[0, 1]).await.unwrap();
557    /// let value = subview.get_mut();
558    /// assert_eq!(*value, String::default());
559    /// # })
560    /// ```
561    pub fn reset_entry_to_default(&mut self, short_key: &[u8]) -> Result<(), ViewError> {
562        let key = self
563            .context
564            .base_key()
565            .base_tag_index(KeyTag::Subview as u8, short_key);
566        let context = self.context.clone_with_base_key(key);
567        let view = W::new(context)?;
568        self.updates
569            .get_mut()
570            .insert(short_key.to_vec(), Update::Set(view));
571        Ok(())
572    }
573
574    /// Tests if the collection contains a specified key and returns a boolean.
575    /// ```rust
576    /// # tokio_test::block_on(async {
577    /// # use linera_views::context::MemoryContext;
578    /// # use linera_views::collection_view::ByteCollectionView;
579    /// # use linera_views::register_view::RegisterView;
580    /// # use linera_views::views::View;
581    /// # let context = MemoryContext::new_for_testing(());
582    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
583    ///     ByteCollectionView::load(context).await.unwrap();
584    /// {
585    ///     let _subview = view.load_entry_mut(&[0, 1]).await.unwrap();
586    /// }
587    /// assert!(view.contains_key(&[0, 1]).await.unwrap());
588    /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
589    /// # })
590    /// ```
591    pub async fn contains_key(&self, short_key: &[u8]) -> Result<bool, ViewError> {
592        let updates = self.updates.read().await;
593        Ok(match updates.get(short_key) {
594            Some(entry) => match entry {
595                Update::Set(_view) => true,
596                _entry @ Update::Removed => false,
597            },
598            None => {
599                let key_index = self
600                    .context
601                    .base_key()
602                    .base_tag_index(KeyTag::Index as u8, short_key);
603                !self.delete_storage_first && self.context.store().contains_key(&key_index).await?
604            }
605        })
606    }
607
608    /// Marks the entry as removed. If absent then nothing is done.
609    /// ```rust
610    /// # tokio_test::block_on(async {
611    /// # use linera_views::context::MemoryContext;
612    /// # use linera_views::collection_view::ByteCollectionView;
613    /// # use linera_views::register_view::RegisterView;
614    /// # use linera_views::views::View;
615    /// # let context = MemoryContext::new_for_testing(());
616    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
617    ///     ByteCollectionView::load(context).await.unwrap();
618    /// let subview = view.load_entry_mut(&[0, 1]).await.unwrap();
619    /// let value = subview.get_mut();
620    /// assert_eq!(*value, String::default());
621    /// view.remove_entry(vec![0, 1]);
622    /// let keys = view.keys().await.unwrap();
623    /// assert_eq!(keys.len(), 0);
624    /// # })
625    /// ```
626    pub fn remove_entry(&mut self, short_key: Vec<u8>) {
627        if self.delete_storage_first {
628            // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
629            self.updates.get_mut().remove(&short_key);
630        } else {
631            self.updates.get_mut().insert(short_key, Update::Removed);
632        }
633    }
634
635    /// Gets the extra data.
636    pub fn extra(&self) -> &<W::Context as Context>::Extra {
637        self.context.extra()
638    }
639}
640
641impl<W: View> ByteCollectionView<W::Context, W> {
642    /// Applies a function f on each index (aka key). Keys are visited in the
643    /// lexicographic order. If the function returns false, then the loop
644    /// ends prematurely.
645    /// ```rust
646    /// # tokio_test::block_on(async {
647    /// # use linera_views::context::MemoryContext;
648    /// # use linera_views::collection_view::ByteCollectionView;
649    /// # use linera_views::register_view::RegisterView;
650    /// # use linera_views::views::View;
651    /// # let context = MemoryContext::new_for_testing(());
652    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
653    ///     ByteCollectionView::load(context).await.unwrap();
654    /// view.load_entry_mut(&[0, 1]).await.unwrap();
655    /// view.load_entry_mut(&[0, 2]).await.unwrap();
656    /// let mut count = 0;
657    /// view.for_each_key_while(|_key| {
658    ///     count += 1;
659    ///     Ok(count < 1)
660    /// })
661    /// .await
662    /// .unwrap();
663    /// assert_eq!(count, 1);
664    /// # })
665    /// ```
666    pub async fn for_each_key_while<F>(&self, mut f: F) -> Result<(), ViewError>
667    where
668        F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
669    {
670        let updates = self.updates.read().await;
671        let mut updates = updates.iter();
672        let mut update = updates.next();
673        if !self.delete_storage_first {
674            let base = self.get_index_key(&[]);
675            for index in self.context.store().find_keys_by_prefix(&base).await? {
676                loop {
677                    match update {
678                        Some((key, value)) if key <= &index => {
679                            if let Update::Set(_) = value {
680                                if !f(key)? {
681                                    return Ok(());
682                                }
683                            }
684                            update = updates.next();
685                            if key == &index {
686                                break;
687                            }
688                        }
689                        _ => {
690                            if !f(&index)? {
691                                return Ok(());
692                            }
693                            break;
694                        }
695                    }
696                }
697            }
698        }
699        while let Some((key, value)) = update {
700            if let Update::Set(_) = value {
701                if !f(key)? {
702                    return Ok(());
703                }
704            }
705            update = updates.next();
706        }
707        Ok(())
708    }
709
710    /// Applies a function f on each index (aka key). Keys are visited in a
711    /// lexicographic order.
712    /// ```rust
713    /// # tokio_test::block_on(async {
714    /// # use linera_views::context::MemoryContext;
715    /// # use linera_views::collection_view::ByteCollectionView;
716    /// # use linera_views::register_view::RegisterView;
717    /// # use linera_views::views::View;
718    /// # let context = MemoryContext::new_for_testing(());
719    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
720    ///     ByteCollectionView::load(context).await.unwrap();
721    /// view.load_entry_mut(&[0, 1]).await.unwrap();
722    /// view.load_entry_mut(&[0, 2]).await.unwrap();
723    /// let mut count = 0;
724    /// view.for_each_key(|_key| {
725    ///     count += 1;
726    ///     Ok(())
727    /// })
728    /// .await
729    /// .unwrap();
730    /// assert_eq!(count, 2);
731    /// # })
732    /// ```
733    pub async fn for_each_key<F>(&self, mut f: F) -> Result<(), ViewError>
734    where
735        F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
736    {
737        self.for_each_key_while(|key| {
738            f(key)?;
739            Ok(true)
740        })
741        .await
742    }
743
744    /// Returns the list of keys in the collection. The order is lexicographic.
745    /// ```rust
746    /// # tokio_test::block_on(async {
747    /// # use linera_views::context::MemoryContext;
748    /// # use linera_views::collection_view::ByteCollectionView;
749    /// # use linera_views::register_view::RegisterView;
750    /// # use linera_views::views::View;
751    /// # let context = MemoryContext::new_for_testing(());
752    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
753    ///     ByteCollectionView::load(context).await.unwrap();
754    /// view.load_entry_mut(&[0, 1]).await.unwrap();
755    /// view.load_entry_mut(&[0, 2]).await.unwrap();
756    /// let keys = view.keys().await.unwrap();
757    /// assert_eq!(keys, vec![vec![0, 1], vec![0, 2]]);
758    /// # })
759    /// ```
760    pub async fn keys(&self) -> Result<Vec<Vec<u8>>, ViewError> {
761        let mut keys = Vec::new();
762        self.for_each_key(|key| {
763            keys.push(key.to_vec());
764            Ok(())
765        })
766        .await?;
767        Ok(keys)
768    }
769
770    /// Returns the number of entries in the collection.
771    /// ```rust
772    /// # tokio_test::block_on(async {
773    /// # use linera_views::context::MemoryContext;
774    /// # use linera_views::collection_view::ByteCollectionView;
775    /// # use linera_views::register_view::RegisterView;
776    /// # use linera_views::views::View;
777    /// # let context = MemoryContext::new_for_testing(());
778    /// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
779    ///     ByteCollectionView::load(context).await.unwrap();
780    /// view.load_entry_mut(&[0, 1]).await.unwrap();
781    /// view.load_entry_mut(&[0, 2]).await.unwrap();
782    /// assert_eq!(view.count().await.unwrap(), 2);
783    /// # })
784    /// ```
785    pub async fn count(&self) -> Result<usize, ViewError> {
786        let mut count = 0;
787        self.for_each_key(|_key| {
788            count += 1;
789            Ok(())
790        })
791        .await?;
792        Ok(count)
793    }
794}
795
796impl<W: HashableView> HashableView for ByteCollectionView<W::Context, W> {
797    type Hasher = sha3::Sha3_256;
798
799    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
800        #[cfg(with_metrics)]
801        let _hash_latency = metrics::COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
802        let mut hasher = sha3::Sha3_256::default();
803        let keys = self.keys().await?;
804        let count = keys.len() as u32;
805        hasher.update_with_bcs_bytes(&count)?;
806        let updates = self.updates.get_mut();
807        for key in keys {
808            hasher.update_with_bytes(&key)?;
809            let hash = match updates.get_mut(&key) {
810                Some(entry) => {
811                    let Update::Set(view) = entry else {
812                        unreachable!();
813                    };
814                    view.hash_mut().await?
815                }
816                None => {
817                    let key = self
818                        .context
819                        .base_key()
820                        .base_tag_index(KeyTag::Subview as u8, &key);
821                    let context = self.context.clone_with_base_key(key);
822                    let mut view = W::load(context).await?;
823                    view.hash_mut().await?
824                }
825            };
826            hasher.write_all(hash.as_ref())?;
827        }
828        Ok(hasher.finalize())
829    }
830
831    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
832        #[cfg(with_metrics)]
833        let _hash_latency = metrics::COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
834        let mut hasher = sha3::Sha3_256::default();
835        let updates = self.updates.read().await; // Acquire the lock to prevent writes.
836        let keys = self.keys().await?;
837        let count = keys.len() as u32;
838        hasher.update_with_bcs_bytes(&count)?;
839        for key in keys {
840            hasher.update_with_bytes(&key)?;
841            let hash = match updates.get(&key) {
842                Some(entry) => {
843                    let Update::Set(view) = entry else {
844                        unreachable!();
845                    };
846                    view.hash().await?
847                }
848                None => {
849                    let key = self
850                        .context
851                        .base_key()
852                        .base_tag_index(KeyTag::Subview as u8, &key);
853                    let context = self.context.clone_with_base_key(key);
854                    let view = W::load(context).await?;
855                    view.hash().await?
856                }
857            };
858            hasher.write_all(hash.as_ref())?;
859        }
860        Ok(hasher.finalize())
861    }
862}
863
864/// A view that supports accessing a collection of views of the same kind, indexed by a
865/// key, one subview at a time.
866#[derive(Debug)]
867pub struct CollectionView<C, I, W> {
868    collection: ByteCollectionView<C, W>,
869    _phantom: PhantomData<I>,
870}
871
872impl<W: View, I> View for CollectionView<W::Context, I, W>
873where
874    I: Send + Sync + Serialize + DeserializeOwned,
875{
876    const NUM_INIT_KEYS: usize = ByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
877
878    type Context = W::Context;
879
880    fn context(&self) -> &Self::Context {
881        self.collection.context()
882    }
883
884    fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
885        ByteCollectionView::<W::Context, W>::pre_load(context)
886    }
887
888    fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
889        let collection = ByteCollectionView::post_load(context, values)?;
890        Ok(CollectionView {
891            collection,
892            _phantom: PhantomData,
893        })
894    }
895
896    fn rollback(&mut self) {
897        self.collection.rollback()
898    }
899
900    async fn has_pending_changes(&self) -> bool {
901        self.collection.has_pending_changes().await
902    }
903
904    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
905        self.collection.flush(batch)
906    }
907
908    fn clear(&mut self) {
909        self.collection.clear()
910    }
911}
912
913impl<I, W: ClonableView> ClonableView for CollectionView<W::Context, I, W>
914where
915    I: Send + Sync + Serialize + DeserializeOwned,
916{
917    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
918        Ok(CollectionView {
919            collection: self.collection.clone_unchecked()?,
920            _phantom: PhantomData,
921        })
922    }
923}
924
925impl<I: Serialize, W: View> CollectionView<W::Context, I, W> {
926    /// Loads a subview for the data at the given index in the collection. If an entry
927    /// is absent then a default entry is added to the collection. The resulting view
928    /// can be modified.
929    /// ```rust
930    /// # tokio_test::block_on(async {
931    /// # use linera_views::context::MemoryContext;
932    /// # use linera_views::collection_view::CollectionView;
933    /// # use linera_views::register_view::RegisterView;
934    /// # use linera_views::views::View;
935    /// # let context = MemoryContext::new_for_testing(());
936    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
937    ///     CollectionView::load(context).await.unwrap();
938    /// let subview = view.load_entry_mut(&23).await.unwrap();
939    /// let value = subview.get();
940    /// assert_eq!(*value, String::default());
941    /// # })
942    /// ```
943    pub async fn load_entry_mut<Q>(&mut self, index: &Q) -> Result<&mut W, ViewError>
944    where
945        I: Borrow<Q>,
946        Q: Serialize + ?Sized,
947    {
948        let short_key = BaseKey::derive_short_key(index)?;
949        self.collection.load_entry_mut(&short_key).await
950    }
951
952    /// Loads a subview for the data at the given index in the collection. If an entry
953    /// is absent then `None` is returned. The resulting view cannot be modified.
954    /// May fail if one subview is already being visited.
955    /// ```rust
956    /// # tokio_test::block_on(async {
957    /// # use linera_views::context::MemoryContext;
958    /// # use linera_views::collection_view::CollectionView;
959    /// # use linera_views::register_view::RegisterView;
960    /// # use linera_views::views::View;
961    /// # let context = MemoryContext::new_for_testing(());
962    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
963    ///     CollectionView::load(context).await.unwrap();
964    /// {
965    ///     let _subview = view.load_entry_mut(&23).await.unwrap();
966    /// }
967    /// {
968    ///     let subview = view.try_load_entry(&23).await.unwrap().unwrap();
969    ///     let value = subview.get();
970    ///     assert_eq!(*value, String::default());
971    /// }
972    /// assert!(view.try_load_entry(&24).await.unwrap().is_none());
973    /// # })
974    /// ```
975    pub async fn try_load_entry<Q>(
976        &self,
977        index: &Q,
978    ) -> Result<Option<ReadGuardedView<W>>, ViewError>
979    where
980        I: Borrow<Q>,
981        Q: Serialize + ?Sized,
982    {
983        let short_key = BaseKey::derive_short_key(index)?;
984        self.collection.try_load_entry(&short_key).await
985    }
986
987    /// Load multiple entries for reading at once.
988    /// The entries in indices have to be all distinct.
989    /// ```rust
990    /// # tokio_test::block_on(async {
991    /// # use linera_views::context::MemoryContext;
992    /// # use linera_views::collection_view::CollectionView;
993    /// # use linera_views::register_view::RegisterView;
994    /// # use linera_views::views::View;
995    /// # let context = MemoryContext::new_for_testing(());
996    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
997    ///     CollectionView::load(context).await.unwrap();
998    /// {
999    ///     let _subview = view.load_entry_mut(&23).await.unwrap();
1000    /// }
1001    /// let indices = vec![23, 24];
1002    /// let subviews = view.try_load_entries(&indices).await.unwrap();
1003    /// let value0 = subviews[0].as_ref().unwrap().get();
1004    /// assert_eq!(*value0, String::default());
1005    /// # })
1006    /// ```
1007    pub async fn try_load_entries<'a, Q>(
1008        &self,
1009        indices: impl IntoIterator<Item = &'a Q>,
1010    ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1011    where
1012        I: Borrow<Q>,
1013        Q: Serialize + 'a,
1014    {
1015        let short_keys = indices
1016            .into_iter()
1017            .map(|index| BaseKey::derive_short_key(index))
1018            .collect::<Result<_, _>>()?;
1019        self.collection.try_load_entries(short_keys).await
1020    }
1021
1022    /// Loads multiple entries for reading at once with their keys.
1023    /// The entries in indices have to be all distinct.
1024    /// ```rust
1025    /// # tokio_test::block_on(async {
1026    /// # use linera_views::context::MemoryContext;
1027    /// # use linera_views::collection_view::CollectionView;
1028    /// # use linera_views::register_view::RegisterView;
1029    /// # use linera_views::views::View;
1030    /// # let context = MemoryContext::new_for_testing(());
1031    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1032    ///     CollectionView::load(context).await.unwrap();
1033    /// {
1034    ///     let _subview = view.load_entry_mut(&23).await.unwrap();
1035    /// }
1036    /// let indices = [23, 24];
1037    /// let subviews = view.try_load_entries_pairs(indices).await.unwrap();
1038    /// let value0 = subviews[0].1.as_ref().unwrap().get();
1039    /// assert_eq!(*value0, String::default());
1040    /// # })
1041    /// ```
1042    pub async fn try_load_entries_pairs<Q>(
1043        &self,
1044        indices: impl IntoIterator<Item = Q>,
1045    ) -> Result<Vec<(Q, Option<ReadGuardedView<W>>)>, ViewError>
1046    where
1047        I: Borrow<Q>,
1048        Q: Serialize + Clone,
1049    {
1050        let indices_vec: Vec<Q> = indices.into_iter().collect();
1051        let values = self.try_load_entries(indices_vec.iter()).await?;
1052        Ok(indices_vec.into_iter().zip(values).collect())
1053    }
1054
1055    /// Load all entries for reading at once.
1056    /// ```rust
1057    /// # tokio_test::block_on(async {
1058    /// # use linera_views::context::MemoryContext;
1059    /// # use linera_views::collection_view::CollectionView;
1060    /// # use linera_views::register_view::RegisterView;
1061    /// # use linera_views::views::View;
1062    /// # let context = MemoryContext::new_for_testing(());
1063    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1064    ///     CollectionView::load(context).await.unwrap();
1065    /// {
1066    ///     let _subview = view.load_entry_mut(&23).await.unwrap();
1067    /// }
1068    /// let subviews = view.try_load_all_entries().await.unwrap();
1069    /// assert_eq!(subviews.len(), 1);
1070    /// # })
1071    /// ```
1072    pub async fn try_load_all_entries(&self) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError>
1073    where
1074        I: DeserializeOwned,
1075    {
1076        let results = self.collection.try_load_all_entries().await?;
1077        results
1078            .into_iter()
1079            .map(|(short_key, view)| {
1080                let index = BaseKey::deserialize_value(&short_key)?;
1081                Ok((index, view))
1082            })
1083            .collect()
1084    }
1085
1086    /// Resets an entry to the default value.
1087    /// ```rust
1088    /// # tokio_test::block_on(async {
1089    /// # use linera_views::context::MemoryContext;
1090    /// # use linera_views::collection_view::CollectionView;
1091    /// # use linera_views::register_view::RegisterView;
1092    /// # use linera_views::views::View;
1093    /// # let context = MemoryContext::new_for_testing(());
1094    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1095    ///     CollectionView::load(context).await.unwrap();
1096    /// let subview = view.load_entry_mut(&23).await.unwrap();
1097    /// let value = subview.get_mut();
1098    /// *value = String::from("Hello");
1099    /// view.reset_entry_to_default(&23).unwrap();
1100    /// let subview = view.load_entry_mut(&23).await.unwrap();
1101    /// let value = subview.get_mut();
1102    /// assert_eq!(*value, String::default());
1103    /// # })
1104    /// ```
1105    pub fn reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1106    where
1107        I: Borrow<Q>,
1108        Q: Serialize + ?Sized,
1109    {
1110        let short_key = BaseKey::derive_short_key(index)?;
1111        self.collection.reset_entry_to_default(&short_key)
1112    }
1113
1114    /// Removes an entry from the `CollectionView`. If absent nothing happens.
1115    /// ```rust
1116    /// # tokio_test::block_on(async {
1117    /// # use linera_views::context::MemoryContext;
1118    /// # use linera_views::collection_view::CollectionView;
1119    /// # use linera_views::register_view::RegisterView;
1120    /// # use linera_views::views::View;
1121    /// # let context = MemoryContext::new_for_testing(());
1122    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1123    ///     CollectionView::load(context).await.unwrap();
1124    /// let subview = view.load_entry_mut(&23).await.unwrap();
1125    /// let value = subview.get_mut();
1126    /// assert_eq!(*value, String::default());
1127    /// view.remove_entry(&23);
1128    /// let keys = view.indices().await.unwrap();
1129    /// assert_eq!(keys.len(), 0);
1130    /// # })
1131    /// ```
1132    pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1133    where
1134        I: Borrow<Q>,
1135        Q: Serialize + ?Sized,
1136    {
1137        let short_key = BaseKey::derive_short_key(index)?;
1138        self.collection.remove_entry(short_key);
1139        Ok(())
1140    }
1141
1142    /// Gets the extra data.
1143    pub fn extra(&self) -> &<W::Context as Context>::Extra {
1144        self.collection.extra()
1145    }
1146}
1147
1148impl<I, W: View> CollectionView<W::Context, I, W>
1149where
1150    I: Sync + Send + Serialize + DeserializeOwned,
1151{
1152    /// Returns the list of indices in the collection in the order determined by
1153    /// the serialization.
1154    /// ```rust
1155    /// # tokio_test::block_on(async {
1156    /// # use linera_views::context::MemoryContext;
1157    /// # use linera_views::collection_view::CollectionView;
1158    /// # use linera_views::register_view::RegisterView;
1159    /// # use linera_views::views::View;
1160    /// # let context = MemoryContext::new_for_testing(());
1161    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1162    ///     CollectionView::load(context).await.unwrap();
1163    /// view.load_entry_mut(&23).await.unwrap();
1164    /// view.load_entry_mut(&25).await.unwrap();
1165    /// let indices = view.indices().await.unwrap();
1166    /// assert_eq!(indices.len(), 2);
1167    /// # })
1168    /// ```
1169    pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
1170        let mut indices = Vec::new();
1171        self.for_each_index(|index| {
1172            indices.push(index);
1173            Ok(())
1174        })
1175        .await?;
1176        Ok(indices)
1177    }
1178
1179    /// Returns the number of entries in the collection.
1180    /// ```rust
1181    /// # tokio_test::block_on(async {
1182    /// # use linera_views::context::MemoryContext;
1183    /// # use linera_views::collection_view::CollectionView;
1184    /// # use linera_views::register_view::RegisterView;
1185    /// # use linera_views::views::View;
1186    /// # let context = MemoryContext::new_for_testing(());
1187    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1188    ///     CollectionView::load(context).await.unwrap();
1189    /// view.load_entry_mut(&23).await.unwrap();
1190    /// view.load_entry_mut(&25).await.unwrap();
1191    /// assert_eq!(view.count().await.unwrap(), 2);
1192    /// # })
1193    /// ```
1194    pub async fn count(&self) -> Result<usize, ViewError> {
1195        self.collection.count().await
1196    }
1197}
1198
1199impl<I: DeserializeOwned, W: View> CollectionView<W::Context, I, W> {
1200    /// Applies a function f on each index. Indices are visited in an order
1201    /// determined by the serialization. If the function returns false then
1202    /// the loop ends prematurely.
1203    /// ```rust
1204    /// # tokio_test::block_on(async {
1205    /// # use linera_views::context::MemoryContext;
1206    /// # use linera_views::collection_view::CollectionView;
1207    /// # use linera_views::register_view::RegisterView;
1208    /// # use linera_views::views::View;
1209    /// # let context = MemoryContext::new_for_testing(());
1210    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1211    ///     CollectionView::load(context).await.unwrap();
1212    /// view.load_entry_mut(&23).await.unwrap();
1213    /// view.load_entry_mut(&24).await.unwrap();
1214    /// let mut count = 0;
1215    /// view.for_each_index_while(|_key| {
1216    ///     count += 1;
1217    ///     Ok(count < 1)
1218    /// })
1219    /// .await
1220    /// .unwrap();
1221    /// assert_eq!(count, 1);
1222    /// # })
1223    /// ```
1224    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
1225    where
1226        F: FnMut(I) -> Result<bool, ViewError> + Send,
1227    {
1228        self.collection
1229            .for_each_key_while(|key| {
1230                let index = BaseKey::deserialize_value(key)?;
1231                f(index)
1232            })
1233            .await?;
1234        Ok(())
1235    }
1236
1237    /// Applies a function f on each index. Indices are visited in an order
1238    /// determined by the serialization.
1239    /// ```rust
1240    /// # tokio_test::block_on(async {
1241    /// # use linera_views::context::MemoryContext;
1242    /// # use linera_views::collection_view::CollectionView;
1243    /// # use linera_views::register_view::RegisterView;
1244    /// # use linera_views::views::View;
1245    /// # let context = MemoryContext::new_for_testing(());
1246    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1247    ///     CollectionView::load(context).await.unwrap();
1248    /// view.load_entry_mut(&23).await.unwrap();
1249    /// view.load_entry_mut(&28).await.unwrap();
1250    /// let mut count = 0;
1251    /// view.for_each_index(|_key| {
1252    ///     count += 1;
1253    ///     Ok(())
1254    /// })
1255    /// .await
1256    /// .unwrap();
1257    /// assert_eq!(count, 2);
1258    /// # })
1259    /// ```
1260    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
1261    where
1262        F: FnMut(I) -> Result<(), ViewError> + Send,
1263    {
1264        self.collection
1265            .for_each_key(|key| {
1266                let index = BaseKey::deserialize_value(key)?;
1267                f(index)
1268            })
1269            .await?;
1270        Ok(())
1271    }
1272}
1273
1274impl<I, W: HashableView> HashableView for CollectionView<W::Context, I, W>
1275where
1276    I: Send + Sync + Serialize + DeserializeOwned,
1277{
1278    type Hasher = sha3::Sha3_256;
1279
1280    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1281        self.collection.hash_mut().await
1282    }
1283
1284    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1285        self.collection.hash().await
1286    }
1287}
1288
1289/// A map view that serializes the indices.
1290#[derive(Debug)]
1291pub struct CustomCollectionView<C, I, W> {
1292    collection: ByteCollectionView<C, W>,
1293    _phantom: PhantomData<I>,
1294}
1295
1296impl<I: Send + Sync, W: View> View for CustomCollectionView<W::Context, I, W> {
1297    const NUM_INIT_KEYS: usize = ByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
1298
1299    type Context = W::Context;
1300
1301    fn context(&self) -> &Self::Context {
1302        self.collection.context()
1303    }
1304
1305    fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
1306        ByteCollectionView::<_, W>::pre_load(context)
1307    }
1308
1309    fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
1310        let collection = ByteCollectionView::post_load(context, values)?;
1311        Ok(CustomCollectionView {
1312            collection,
1313            _phantom: PhantomData,
1314        })
1315    }
1316
1317    fn rollback(&mut self) {
1318        self.collection.rollback()
1319    }
1320
1321    async fn has_pending_changes(&self) -> bool {
1322        self.collection.has_pending_changes().await
1323    }
1324
1325    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
1326        self.collection.flush(batch)
1327    }
1328
1329    fn clear(&mut self) {
1330        self.collection.clear()
1331    }
1332}
1333
1334impl<I: Send + Sync, W: ClonableView> ClonableView for CustomCollectionView<W::Context, I, W> {
1335    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
1336        Ok(CustomCollectionView {
1337            collection: self.collection.clone_unchecked()?,
1338            _phantom: PhantomData,
1339        })
1340    }
1341}
1342
1343impl<I: CustomSerialize, W: View> CustomCollectionView<W::Context, I, W> {
1344    /// Loads a subview for the data at the given index in the collection. If an entry
1345    /// is absent then a default entry is added to the collection. The resulting view
1346    /// can be modified.
1347    /// ```rust
1348    /// # tokio_test::block_on(async {
1349    /// # use linera_views::context::MemoryContext;
1350    /// # use linera_views::collection_view::CustomCollectionView;
1351    /// # use linera_views::register_view::RegisterView;
1352    /// # use linera_views::views::View;
1353    /// # let context = MemoryContext::new_for_testing(());
1354    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1355    ///     CustomCollectionView::load(context).await.unwrap();
1356    /// let subview = view.load_entry_mut(&23).await.unwrap();
1357    /// let value = subview.get();
1358    /// assert_eq!(*value, String::default());
1359    /// # })
1360    /// ```
1361    pub async fn load_entry_mut<Q>(&mut self, index: &Q) -> Result<&mut W, ViewError>
1362    where
1363        I: Borrow<Q>,
1364        Q: CustomSerialize,
1365    {
1366        let short_key = index.to_custom_bytes()?;
1367        self.collection.load_entry_mut(&short_key).await
1368    }
1369
1370    /// Loads a subview for the data at the given index in the collection. If an entry
1371    /// is absent then `None` is returned. The resulting view cannot be modified.
1372    /// May fail if one subview is already being visited.
1373    /// ```rust
1374    /// # tokio_test::block_on(async {
1375    /// # use linera_views::context::MemoryContext;
1376    /// # use linera_views::collection_view::CustomCollectionView;
1377    /// # use linera_views::register_view::RegisterView;
1378    /// # use linera_views::views::View;
1379    /// # let context = MemoryContext::new_for_testing(());
1380    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1381    ///     CustomCollectionView::load(context).await.unwrap();
1382    /// {
1383    ///     let _subview = view.load_entry_mut(&23).await.unwrap();
1384    /// }
1385    /// {
1386    ///     let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1387    ///     let value = subview.get();
1388    ///     assert_eq!(*value, String::default());
1389    /// }
1390    /// assert!(view.try_load_entry(&24).await.unwrap().is_none());
1391    /// # })
1392    /// ```
1393    pub async fn try_load_entry<Q>(
1394        &self,
1395        index: &Q,
1396    ) -> Result<Option<ReadGuardedView<W>>, ViewError>
1397    where
1398        I: Borrow<Q>,
1399        Q: CustomSerialize,
1400    {
1401        let short_key = index.to_custom_bytes()?;
1402        self.collection.try_load_entry(&short_key).await
1403    }
1404
1405    /// Load multiple entries for reading at once.
1406    /// The entries in indices have to be all distinct.
1407    /// ```rust
1408    /// # tokio_test::block_on(async {
1409    /// # use linera_views::context::MemoryContext;
1410    /// # use linera_views::collection_view::CustomCollectionView;
1411    /// # use linera_views::register_view::RegisterView;
1412    /// # use linera_views::views::View;
1413    /// # let context = MemoryContext::new_for_testing(());
1414    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1415    ///     CustomCollectionView::load(context).await.unwrap();
1416    /// {
1417    ///     let _subview = view.load_entry_mut(&23).await.unwrap();
1418    /// }
1419    /// let subviews = view.try_load_entries(&[23, 42]).await.unwrap();
1420    /// let value0 = subviews[0].as_ref().unwrap().get();
1421    /// assert_eq!(*value0, String::default());
1422    /// # })
1423    /// ```
1424    pub async fn try_load_entries<'a, Q>(
1425        &self,
1426        indices: impl IntoIterator<Item = &'a Q>,
1427    ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1428    where
1429        I: Borrow<Q>,
1430        Q: CustomSerialize + 'a,
1431    {
1432        let short_keys = indices
1433            .into_iter()
1434            .map(|index| index.to_custom_bytes())
1435            .collect::<Result<_, _>>()?;
1436        self.collection.try_load_entries(short_keys).await
1437    }
1438
1439    /// Loads multiple entries for reading at once with their keys.
1440    /// The entries in indices have to be all distinct.
1441    /// ```rust
1442    /// # tokio_test::block_on(async {
1443    /// # use linera_views::context::MemoryContext;
1444    /// # use linera_views::collection_view::CustomCollectionView;
1445    /// # use linera_views::register_view::RegisterView;
1446    /// # use linera_views::views::View;
1447    /// # let context = MemoryContext::new_for_testing(());
1448    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1449    ///     CustomCollectionView::load(context).await.unwrap();
1450    /// {
1451    ///     let _subview = view.load_entry_mut(&23).await.unwrap();
1452    /// }
1453    /// let indices = [23, 42];
1454    /// let subviews = view.try_load_entries_pairs(indices).await.unwrap();
1455    /// let value0 = subviews[0].1.as_ref().unwrap().get();
1456    /// assert_eq!(*value0, String::default());
1457    /// # })
1458    /// ```
1459    pub async fn try_load_entries_pairs<Q>(
1460        &self,
1461        indices: impl IntoIterator<Item = Q>,
1462    ) -> Result<Vec<(Q, Option<ReadGuardedView<W>>)>, ViewError>
1463    where
1464        I: Borrow<Q>,
1465        Q: CustomSerialize + Clone,
1466    {
1467        let indices_vec: Vec<Q> = indices.into_iter().collect();
1468        let values = self.try_load_entries(indices_vec.iter()).await?;
1469        Ok(indices_vec.into_iter().zip(values).collect())
1470    }
1471
1472    /// Load all entries for reading at once.
1473    /// ```rust
1474    /// # tokio_test::block_on(async {
1475    /// # use linera_views::context::MemoryContext;
1476    /// # use linera_views::collection_view::CustomCollectionView;
1477    /// # use linera_views::register_view::RegisterView;
1478    /// # use linera_views::views::View;
1479    /// # let context = MemoryContext::new_for_testing(());
1480    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1481    ///     CustomCollectionView::load(context).await.unwrap();
1482    /// {
1483    ///     let _subview = view.load_entry_mut(&23).await.unwrap();
1484    /// }
1485    /// let subviews = view.try_load_all_entries().await.unwrap();
1486    /// assert_eq!(subviews.len(), 1);
1487    /// # })
1488    /// ```
1489    pub async fn try_load_all_entries(&self) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError>
1490    where
1491        I: CustomSerialize,
1492    {
1493        let results = self.collection.try_load_all_entries().await?;
1494        results
1495            .into_iter()
1496            .map(|(short_key, view)| {
1497                let index = I::from_custom_bytes(&short_key)?;
1498                Ok((index, view))
1499            })
1500            .collect()
1501    }
1502
1503    /// Marks the entry so that it is removed in the next flush.
1504    /// ```rust
1505    /// # tokio_test::block_on(async {
1506    /// # use linera_views::context::MemoryContext;
1507    /// # use linera_views::collection_view::CustomCollectionView;
1508    /// # use linera_views::register_view::RegisterView;
1509    /// # use linera_views::views::View;
1510    /// # let context = MemoryContext::new_for_testing(());
1511    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1512    ///     CustomCollectionView::load(context).await.unwrap();
1513    /// let subview = view.load_entry_mut(&23).await.unwrap();
1514    /// let value = subview.get_mut();
1515    /// *value = String::from("Hello");
1516    /// view.reset_entry_to_default(&23).unwrap();
1517    /// let subview = view.load_entry_mut(&23).await.unwrap();
1518    /// let value = subview.get_mut();
1519    /// assert_eq!(*value, String::default());
1520    /// # })
1521    /// ```
1522    pub fn reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1523    where
1524        I: Borrow<Q>,
1525        Q: CustomSerialize,
1526    {
1527        let short_key = index.to_custom_bytes()?;
1528        self.collection.reset_entry_to_default(&short_key)
1529    }
1530
1531    /// Removes an entry from the `CollectionView`. If absent nothing happens.
1532    /// ```rust
1533    /// # tokio_test::block_on(async {
1534    /// # use linera_views::context::MemoryContext;
1535    /// # use linera_views::collection_view::CustomCollectionView;
1536    /// # use linera_views::register_view::RegisterView;
1537    /// # use linera_views::views::View;
1538    /// # let context = MemoryContext::new_for_testing(());
1539    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1540    ///     CustomCollectionView::load(context).await.unwrap();
1541    /// let subview = view.load_entry_mut(&23).await.unwrap();
1542    /// let value = subview.get_mut();
1543    /// assert_eq!(*value, String::default());
1544    /// view.remove_entry(&23);
1545    /// let keys = view.indices().await.unwrap();
1546    /// assert_eq!(keys.len(), 0);
1547    /// # })
1548    /// ```
1549    pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1550    where
1551        I: Borrow<Q>,
1552        Q: CustomSerialize,
1553    {
1554        let short_key = index.to_custom_bytes()?;
1555        self.collection.remove_entry(short_key);
1556        Ok(())
1557    }
1558
1559    /// Gets the extra data.
1560    pub fn extra(&self) -> &<W::Context as Context>::Extra {
1561        self.collection.extra()
1562    }
1563}
1564
1565impl<I: CustomSerialize + Send, W: View> CustomCollectionView<W::Context, I, W> {
1566    /// Returns the list of indices in the collection in the order determined by the custom serialization.
1567    /// ```rust
1568    /// # tokio_test::block_on(async {
1569    /// # use linera_views::context::MemoryContext;
1570    /// # use linera_views::collection_view::CustomCollectionView;
1571    /// # use linera_views::register_view::RegisterView;
1572    /// # use linera_views::views::View;
1573    /// # let context = MemoryContext::new_for_testing(());
1574    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1575    ///     CustomCollectionView::load(context).await.unwrap();
1576    /// view.load_entry_mut(&23).await.unwrap();
1577    /// view.load_entry_mut(&25).await.unwrap();
1578    /// let indices = view.indices().await.unwrap();
1579    /// assert_eq!(indices, vec![23, 25]);
1580    /// # })
1581    /// ```
1582    pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
1583        let mut indices = Vec::new();
1584        self.for_each_index(|index| {
1585            indices.push(index);
1586            Ok(())
1587        })
1588        .await?;
1589        Ok(indices)
1590    }
1591
1592    /// Returns the number of entries in the collection.
1593    /// ```rust
1594    /// # tokio_test::block_on(async {
1595    /// # use linera_views::context::MemoryContext;
1596    /// # use linera_views::collection_view::CollectionView;
1597    /// # use linera_views::register_view::RegisterView;
1598    /// # use linera_views::views::View;
1599    /// # let context = MemoryContext::new_for_testing(());
1600    /// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1601    ///     CollectionView::load(context).await.unwrap();
1602    /// view.load_entry_mut(&23).await.unwrap();
1603    /// view.load_entry_mut(&25).await.unwrap();
1604    /// assert_eq!(view.count().await.unwrap(), 2);
1605    /// # })
1606    /// ```
1607    pub async fn count(&self) -> Result<usize, ViewError> {
1608        self.collection.count().await
1609    }
1610}
1611
1612impl<I: CustomSerialize, W: View> CustomCollectionView<W::Context, I, W> {
1613    /// Applies a function f on each index. Indices are visited in an order
1614    /// determined by the custom serialization. If the function f returns false,
1615    /// then the loop ends prematurely.
1616    /// ```rust
1617    /// # tokio_test::block_on(async {
1618    /// # use linera_views::context::MemoryContext;
1619    /// # use linera_views::collection_view::CustomCollectionView;
1620    /// # use linera_views::register_view::RegisterView;
1621    /// # use linera_views::views::View;
1622    /// # let context = MemoryContext::new_for_testing(());
1623    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1624    ///     CustomCollectionView::load(context).await.unwrap();
1625    /// view.load_entry_mut(&28).await.unwrap();
1626    /// view.load_entry_mut(&24).await.unwrap();
1627    /// view.load_entry_mut(&23).await.unwrap();
1628    /// let mut part_indices = Vec::new();
1629    /// view.for_each_index_while(|index| {
1630    ///     part_indices.push(index);
1631    ///     Ok(part_indices.len() < 2)
1632    /// })
1633    /// .await
1634    /// .unwrap();
1635    /// assert_eq!(part_indices, vec![23, 24]);
1636    /// # })
1637    /// ```
1638    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
1639    where
1640        F: FnMut(I) -> Result<bool, ViewError> + Send,
1641    {
1642        self.collection
1643            .for_each_key_while(|key| {
1644                let index = I::from_custom_bytes(key)?;
1645                f(index)
1646            })
1647            .await?;
1648        Ok(())
1649    }
1650
1651    /// Applies a function on each index. Indices are visited in an order
1652    /// determined by the custom serialization.
1653    /// ```rust
1654    /// # tokio_test::block_on(async {
1655    /// # use linera_views::context::MemoryContext;
1656    /// # use linera_views::collection_view::CustomCollectionView;
1657    /// # use linera_views::register_view::RegisterView;
1658    /// # use linera_views::views::View;
1659    /// # let context = MemoryContext::new_for_testing(());
1660    /// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1661    ///     CustomCollectionView::load(context).await.unwrap();
1662    /// view.load_entry_mut(&28).await.unwrap();
1663    /// view.load_entry_mut(&24).await.unwrap();
1664    /// view.load_entry_mut(&23).await.unwrap();
1665    /// let mut indices = Vec::new();
1666    /// view.for_each_index(|index| {
1667    ///     indices.push(index);
1668    ///     Ok(())
1669    /// })
1670    /// .await
1671    /// .unwrap();
1672    /// assert_eq!(indices, vec![23, 24, 28]);
1673    /// # })
1674    /// ```
1675    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
1676    where
1677        F: FnMut(I) -> Result<(), ViewError> + Send,
1678    {
1679        self.collection
1680            .for_each_key(|key| {
1681                let index = I::from_custom_bytes(key)?;
1682                f(index)
1683            })
1684            .await?;
1685        Ok(())
1686    }
1687}
1688
1689impl<I, W: HashableView> HashableView for CustomCollectionView<W::Context, I, W>
1690where
1691    Self: View,
1692{
1693    type Hasher = sha3::Sha3_256;
1694
1695    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1696        self.collection.hash_mut().await
1697    }
1698
1699    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1700        self.collection.hash().await
1701    }
1702}
1703
1704/// Type wrapping `ByteCollectionView` while memoizing the hash.
1705pub type HashedByteCollectionView<C, W> =
1706    WrappedHashableContainerView<C, ByteCollectionView<C, W>, HasherOutput>;
1707
1708/// Wrapper around `ByteCollectionView` to compute hashes based on the history of changes.
1709pub type HistoricallyHashedByteCollectionView<C, W> =
1710    HistoricallyHashableView<C, ByteCollectionView<C, W>>;
1711
1712/// Type wrapping `CollectionView` while memoizing the hash.
1713pub type HashedCollectionView<C, I, W> =
1714    WrappedHashableContainerView<C, CollectionView<C, I, W>, HasherOutput>;
1715
1716/// Wrapper around `CollectionView` to compute hashes based on the history of changes.
1717pub type HistoricallyHashedCollectionView<C, I, W> =
1718    HistoricallyHashableView<C, CollectionView<C, I, W>>;
1719
1720/// Type wrapping `CustomCollectionView` while memoizing the hash.
1721pub type HashedCustomCollectionView<C, I, W> =
1722    WrappedHashableContainerView<C, CustomCollectionView<C, I, W>, HasherOutput>;
1723
1724/// Wrapper around `CustomCollectionView` to compute hashes based on the history of changes.
1725pub type HistoricallyHashedCustomCollectionView<C, I, W> =
1726    HistoricallyHashableView<C, CustomCollectionView<C, I, W>>;
1727
1728#[cfg(with_graphql)]
1729mod graphql {
1730    use std::borrow::Cow;
1731
1732    use super::{CollectionView, CustomCollectionView, ReadGuardedView};
1733    use crate::{
1734        graphql::{hash_name, mangle, missing_key_error, Entry, MapInput},
1735        views::View,
1736    };
1737
1738    impl<T: async_graphql::OutputType> async_graphql::OutputType for ReadGuardedView<'_, T> {
1739        fn type_name() -> Cow<'static, str> {
1740            T::type_name()
1741        }
1742
1743        fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
1744            T::create_type_info(registry)
1745        }
1746
1747        async fn resolve(
1748            &self,
1749            ctx: &async_graphql::ContextSelectionSet<'_>,
1750            field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
1751        ) -> async_graphql::ServerResult<async_graphql::Value> {
1752            (**self).resolve(ctx, field).await
1753        }
1754    }
1755
1756    impl<C: Send + Sync, K: async_graphql::OutputType, V: async_graphql::OutputType>
1757        async_graphql::TypeName for CollectionView<C, K, V>
1758    {
1759        fn type_name() -> Cow<'static, str> {
1760            format!(
1761                "CollectionView_{}_{}_{:08x}",
1762                mangle(K::type_name()),
1763                mangle(V::type_name()),
1764                hash_name::<(K, V)>(),
1765            )
1766            .into()
1767        }
1768    }
1769
1770    #[async_graphql::Object(cache_control(no_cache), name_type)]
1771    impl<K, V> CollectionView<V::Context, K, V>
1772    where
1773        K: async_graphql::InputType
1774            + async_graphql::OutputType
1775            + serde::ser::Serialize
1776            + serde::de::DeserializeOwned
1777            + std::fmt::Debug,
1778        V: View + async_graphql::OutputType,
1779    {
1780        async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
1781            Ok(self.indices().await?)
1782        }
1783
1784        #[graphql(derived(name = "count"))]
1785        async fn count_(&self) -> Result<u32, async_graphql::Error> {
1786            Ok(self.count().await? as u32)
1787        }
1788
1789        async fn entry(
1790            &self,
1791            key: K,
1792        ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
1793            let value = self
1794                .try_load_entry(&key)
1795                .await?
1796                .ok_or_else(|| missing_key_error(&key))?;
1797            Ok(Entry { value, key })
1798        }
1799
1800        async fn entries(
1801            &self,
1802            input: Option<MapInput<K>>,
1803        ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
1804            let keys = if let Some(keys) = input
1805                .and_then(|input| input.filters)
1806                .and_then(|filters| filters.keys)
1807            {
1808                keys
1809            } else {
1810                self.indices().await?
1811            };
1812
1813            let values = self.try_load_entries(&keys).await?;
1814            Ok(values
1815                .into_iter()
1816                .zip(keys)
1817                .filter_map(|(value, key)| value.map(|value| Entry { value, key }))
1818                .collect())
1819        }
1820    }
1821
1822    impl<C: Send + Sync, K: async_graphql::InputType, V: async_graphql::OutputType>
1823        async_graphql::TypeName for CustomCollectionView<C, K, V>
1824    {
1825        fn type_name() -> Cow<'static, str> {
1826            format!(
1827                "CustomCollectionView_{}_{}_{:08x}",
1828                mangle(K::type_name()),
1829                mangle(V::type_name()),
1830                hash_name::<(K, V)>(),
1831            )
1832            .into()
1833        }
1834    }
1835
1836    #[async_graphql::Object(cache_control(no_cache), name_type)]
1837    impl<K, V> CustomCollectionView<V::Context, K, V>
1838    where
1839        K: async_graphql::InputType
1840            + async_graphql::OutputType
1841            + crate::common::CustomSerialize
1842            + std::fmt::Debug,
1843        V: View + async_graphql::OutputType,
1844    {
1845        async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
1846            Ok(self.indices().await?)
1847        }
1848
1849        #[graphql(derived(name = "count"))]
1850        async fn count_(&self) -> Result<u32, async_graphql::Error> {
1851            Ok(self.count().await? as u32)
1852        }
1853
1854        async fn entry(
1855            &self,
1856            key: K,
1857        ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
1858            let value = self
1859                .try_load_entry(&key)
1860                .await?
1861                .ok_or_else(|| missing_key_error(&key))?;
1862            Ok(Entry { value, key })
1863        }
1864
1865        async fn entries(
1866            &self,
1867            input: Option<MapInput<K>>,
1868        ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
1869            let keys = if let Some(keys) = input
1870                .and_then(|input| input.filters)
1871                .and_then(|filters| filters.keys)
1872            {
1873                keys
1874            } else {
1875                self.indices().await?
1876            };
1877
1878            let values = self.try_load_entries(&keys).await?;
1879            Ok(values
1880                .into_iter()
1881                .zip(keys)
1882                .filter_map(|(value, key)| value.map(|value| Entry { value, key }))
1883                .collect())
1884        }
1885    }
1886}