linera_views/views/reentrant_collection_view.rs
1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5 borrow::Borrow,
6 collections::{btree_map, BTreeMap},
7 io::Write,
8 marker::PhantomData,
9 mem,
10 sync::{Arc, Mutex},
11};
12
13use async_lock::{RwLock, RwLockReadGuardArc, RwLockWriteGuardArc};
14#[cfg(with_metrics)]
15use linera_base::prometheus_util::MeasureLatency as _;
16use serde::{de::DeserializeOwned, Serialize};
17
18use crate::{
19 batch::Batch,
20 common::{CustomSerialize, HasherOutput, Update},
21 context::{BaseKey, Context},
22 hashable_wrapper::WrappedHashableContainerView,
23 store::ReadableKeyValueStore as _,
24 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
25};
26
27#[cfg(with_metrics)]
28mod metrics {
29 use std::sync::LazyLock;
30
31 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
32 use prometheus::HistogramVec;
33
34 /// The runtime of hash computation
35 pub static REENTRANT_COLLECTION_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> =
36 LazyLock::new(|| {
37 register_histogram_vec(
38 "reentrant_collection_view_hash_runtime",
39 "ReentrantCollectionView hash runtime",
40 &[],
41 exponential_bucket_latencies(5.0),
42 )
43 });
44}
45
46/// A read-only accessor for a particular subview in a [`ReentrantCollectionView`].
47#[derive(Debug)]
48pub struct ReadGuardedView<T>(RwLockReadGuardArc<T>);
49
50impl<T> std::ops::Deref for ReadGuardedView<T> {
51 type Target = T;
52 fn deref(&self) -> &T {
53 self.0.deref()
54 }
55}
56
57/// A read-write accessor for a particular subview in a [`ReentrantCollectionView`].
58#[derive(Debug)]
59pub struct WriteGuardedView<T>(RwLockWriteGuardArc<T>);
60
61impl<T> std::ops::Deref for WriteGuardedView<T> {
62 type Target = T;
63 fn deref(&self) -> &T {
64 self.0.deref()
65 }
66}
67
68impl<T> std::ops::DerefMut for WriteGuardedView<T> {
69 fn deref_mut(&mut self) -> &mut T {
70 self.0.deref_mut()
71 }
72}
73
74/// A view that supports accessing a collection of views of the same kind, indexed by `Vec<u8>`,
75/// possibly several subviews at a time.
76#[derive(Debug)]
77pub struct ReentrantByteCollectionView<C, W> {
78 /// The view [`Context`].
79 context: C,
80 /// If the current persisted data will be completely erased and replaced on the next flush.
81 delete_storage_first: bool,
82 /// Entries that may have staged changes.
83 updates: BTreeMap<Vec<u8>, Update<Arc<RwLock<W>>>>,
84 /// Entries cached in memory that have the exact same state as in the persistent storage.
85 cached_entries: Mutex<BTreeMap<Vec<u8>, Arc<RwLock<W>>>>,
86}
87
88impl<W, C2> ReplaceContext<C2> for ReentrantByteCollectionView<W::Context, W>
89where
90 W: View + ReplaceContext<C2>,
91 C2: Context,
92{
93 type Target = ReentrantByteCollectionView<C2, <W as ReplaceContext<C2>>::Target>;
94
95 async fn with_context(
96 &mut self,
97 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
98 ) -> Self::Target {
99 let mut updates: BTreeMap<_, Update<Arc<RwLock<W::Target>>>> = BTreeMap::new();
100 let mut cached_entries = BTreeMap::new();
101 for (key, update) in &self.updates {
102 let new_value = match update {
103 Update::Removed => Update::Removed,
104 Update::Set(x) => Update::Set(Arc::new(RwLock::new(
105 x.write().await.with_context(ctx.clone()).await,
106 ))),
107 };
108 updates.insert(key.clone(), new_value);
109 }
110 let old_cached_entries = self.cached_entries.lock().unwrap().clone();
111 for (key, entry) in old_cached_entries {
112 cached_entries.insert(
113 key,
114 Arc::new(RwLock::new(
115 entry.write().await.with_context(ctx.clone()).await,
116 )),
117 );
118 }
119 ReentrantByteCollectionView {
120 context: ctx(self.context()),
121 delete_storage_first: self.delete_storage_first,
122 updates,
123 cached_entries: Mutex::new(cached_entries),
124 }
125 }
126}
127
128/// We need to find new base keys in order to implement the collection view.
129/// We do this by appending a value to the base key.
130///
131/// Sub-views in a collection share a common key prefix, like in other view types. However,
132/// just concatenating the shared prefix with sub-view keys makes it impossible to distinguish if a
133/// given key belongs to a child sub-view or a grandchild sub-view (consider for example if a
134/// collection is stored inside the collection).
135#[repr(u8)]
136enum KeyTag {
137 /// Prefix for specifying an index and serves to indicate the existence of an entry in the collection.
138 Index = MIN_VIEW_TAG,
139 /// Prefix for specifying as the prefix for the sub-view.
140 Subview,
141}
142
143impl<W: View> View for ReentrantByteCollectionView<W::Context, W> {
144 const NUM_INIT_KEYS: usize = 0;
145
146 type Context = W::Context;
147
148 fn context(&self) -> &Self::Context {
149 &self.context
150 }
151
152 fn pre_load(_context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
153 Ok(Vec::new())
154 }
155
156 fn post_load(context: Self::Context, _values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
157 Ok(Self {
158 context,
159 delete_storage_first: false,
160 updates: BTreeMap::new(),
161 cached_entries: Mutex::new(BTreeMap::new()),
162 })
163 }
164
165 async fn load(context: Self::Context) -> Result<Self, ViewError> {
166 Self::post_load(context, &[])
167 }
168
169 fn rollback(&mut self) {
170 self.delete_storage_first = false;
171 self.updates.clear();
172 }
173
174 async fn has_pending_changes(&self) -> bool {
175 if self.delete_storage_first {
176 return true;
177 }
178 !self.updates.is_empty()
179 }
180
181 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
182 let mut delete_view = false;
183 if self.delete_storage_first {
184 delete_view = true;
185 batch.delete_key_prefix(self.context.base_key().bytes.clone());
186 for (index, update) in mem::take(&mut self.updates) {
187 if let Update::Set(view) = update {
188 let mut view = Arc::try_unwrap(view)
189 .map_err(|_| ViewError::TryLockError(index.clone()))?
190 .into_inner();
191 view.flush(batch)?;
192 self.add_index(batch, &index);
193 delete_view = false;
194 }
195 }
196 } else {
197 for (index, update) in mem::take(&mut self.updates) {
198 match update {
199 Update::Set(view) => {
200 let mut view = Arc::try_unwrap(view)
201 .map_err(|_| ViewError::TryLockError(index.clone()))?
202 .into_inner();
203 view.flush(batch)?;
204 self.add_index(batch, &index);
205 }
206 Update::Removed => {
207 let key_subview = self.get_subview_key(&index);
208 let key_index = self.get_index_key(&index);
209 batch.delete_key(key_index);
210 batch.delete_key_prefix(key_subview);
211 }
212 }
213 }
214 }
215 self.delete_storage_first = false;
216 Ok(delete_view)
217 }
218
219 fn clear(&mut self) {
220 self.delete_storage_first = true;
221 self.updates.clear();
222 self.cached_entries.get_mut().unwrap().clear();
223 }
224}
225
226impl<W: ClonableView> ClonableView for ReentrantByteCollectionView<W::Context, W> {
227 fn clone_unchecked(&mut self) -> Self {
228 let cloned_updates = self
229 .updates
230 .iter()
231 .map(|(key, value)| {
232 let cloned_value = match value {
233 Update::Removed => Update::Removed,
234 Update::Set(view_lock) => {
235 let mut view = view_lock
236 .try_write()
237 .expect("Unable to acquire write lock during clone_unchecked");
238
239 Update::Set(Arc::new(RwLock::new(view.clone_unchecked())))
240 }
241 };
242 (key.clone(), cloned_value)
243 })
244 .collect();
245
246 ReentrantByteCollectionView {
247 context: self.context.clone(),
248 delete_storage_first: self.delete_storage_first,
249 updates: cloned_updates,
250 cached_entries: Mutex::new(BTreeMap::new()),
251 }
252 }
253}
254
255impl<C: Context, W> ReentrantByteCollectionView<C, W> {
256 fn get_index_key(&self, index: &[u8]) -> Vec<u8> {
257 self.context
258 .base_key()
259 .base_tag_index(KeyTag::Index as u8, index)
260 }
261
262 fn get_subview_key(&self, index: &[u8]) -> Vec<u8> {
263 self.context
264 .base_key()
265 .base_tag_index(KeyTag::Subview as u8, index)
266 }
267
268 fn add_index(&self, batch: &mut Batch, index: &[u8]) {
269 let key = self.get_index_key(index);
270 batch.put_key_value_bytes(key, vec![]);
271 }
272}
273
274impl<W: View> ReentrantByteCollectionView<W::Context, W> {
275 /// Reads the view and if missing returns the default view
276 async fn wrapped_view(
277 context: &W::Context,
278 delete_storage_first: bool,
279 short_key: &[u8],
280 ) -> Result<Arc<RwLock<W>>, ViewError> {
281 let key = context
282 .base_key()
283 .base_tag_index(KeyTag::Subview as u8, short_key);
284 let context = context.clone_with_base_key(key);
285 // Obtain a view and set its pending state to the default (e.g. empty) state
286 let view = if delete_storage_first {
287 W::new(context)?
288 } else {
289 W::load(context).await?
290 };
291 Ok(Arc::new(RwLock::new(view)))
292 }
293
294 /// Load the view and insert it into the updates if needed.
295 /// If the entry is missing, then it is set to default.
296 async fn try_load_view_mut(&mut self, short_key: &[u8]) -> Result<Arc<RwLock<W>>, ViewError> {
297 use btree_map::Entry::*;
298 Ok(match self.updates.entry(short_key.to_owned()) {
299 Occupied(mut entry) => match entry.get_mut() {
300 Update::Set(view) => view.clone(),
301 entry @ Update::Removed => {
302 let wrapped_view = Self::wrapped_view(&self.context, true, short_key).await?;
303 *entry = Update::Set(wrapped_view.clone());
304 wrapped_view
305 }
306 },
307 Vacant(entry) => {
308 let wrapped_view = match self.cached_entries.get_mut().unwrap().remove(short_key) {
309 Some(view) => view,
310 None => {
311 Self::wrapped_view(&self.context, self.delete_storage_first, short_key)
312 .await?
313 }
314 };
315 entry.insert(Update::Set(wrapped_view.clone()));
316 wrapped_view
317 }
318 })
319 }
320
321 /// Load the view from the update is available.
322 /// If missing, then the entry is loaded from storage and if
323 /// missing there an error is reported.
324 async fn try_load_view(&self, short_key: &[u8]) -> Result<Option<Arc<RwLock<W>>>, ViewError> {
325 Ok(if let Some(entry) = self.updates.get(short_key) {
326 match entry {
327 Update::Set(view) => Some(view.clone()),
328 _entry @ Update::Removed => None,
329 }
330 } else if self.delete_storage_first {
331 None
332 } else {
333 let view = {
334 let cached_entries = self.cached_entries.lock().unwrap();
335 let view = cached_entries.get(short_key);
336 view.cloned()
337 };
338 if let Some(view) = view {
339 Some(view.clone())
340 } else {
341 let key_index = self
342 .context
343 .base_key()
344 .base_tag_index(KeyTag::Index as u8, short_key);
345 if self.context.store().contains_key(&key_index).await? {
346 let view = Self::wrapped_view(&self.context, false, short_key).await?;
347 let mut cached_entries = self.cached_entries.lock().unwrap();
348 cached_entries.insert(short_key.to_owned(), view.clone());
349 Some(view)
350 } else {
351 None
352 }
353 }
354 })
355 }
356
357 /// Loads a subview for the data at the given index in the collection. If an entry
358 /// is absent then a default entry is added to the collection. The resulting view
359 /// can be modified.
360 /// ```rust
361 /// # tokio_test::block_on(async {
362 /// # use linera_views::context::MemoryContext;
363 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
364 /// # use linera_views::register_view::RegisterView;
365 /// # use linera_views::views::View;
366 /// # let context = MemoryContext::new_for_testing(());
367 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
368 /// ReentrantByteCollectionView::load(context).await.unwrap();
369 /// let subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
370 /// let value = subview.get();
371 /// assert_eq!(*value, String::default());
372 /// # })
373 /// ```
374 pub async fn try_load_entry_mut(
375 &mut self,
376 short_key: &[u8],
377 ) -> Result<WriteGuardedView<W>, ViewError> {
378 Ok(WriteGuardedView(
379 self.try_load_view_mut(short_key)
380 .await?
381 .try_write_arc()
382 .ok_or_else(|| ViewError::TryLockError(short_key.to_vec()))?,
383 ))
384 }
385
386 /// Loads a subview at the given index in the collection and gives read-only access to the data.
387 /// If an entry is absent then `None` is returned.
388 /// ```rust
389 /// # tokio_test::block_on(async {
390 /// # use linera_views::context::MemoryContext;
391 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
392 /// # use linera_views::register_view::RegisterView;
393 /// # use linera_views::views::View;
394 /// # let context = MemoryContext::new_for_testing(());
395 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
396 /// ReentrantByteCollectionView::load(context).await.unwrap();
397 /// {
398 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
399 /// }
400 /// let subview = view.try_load_entry(&[0, 1]).await.unwrap().unwrap();
401 /// let value = subview.get();
402 /// assert_eq!(*value, String::default());
403 /// # })
404 /// ```
405 pub async fn try_load_entry(
406 &self,
407 short_key: &[u8],
408 ) -> Result<Option<ReadGuardedView<W>>, ViewError> {
409 match self.try_load_view(short_key).await? {
410 None => Ok(None),
411 Some(view) => Ok(Some(ReadGuardedView(
412 view.try_read_arc()
413 .ok_or_else(|| ViewError::TryLockError(short_key.to_vec()))?,
414 ))),
415 }
416 }
417
418 /// Returns `true` if the collection contains a value for the specified key.
419 /// ```rust
420 /// # tokio_test::block_on(async {
421 /// # use linera_views::context::MemoryContext;
422 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
423 /// # use linera_views::register_view::RegisterView;
424 /// # use linera_views::views::View;
425 /// # let context = MemoryContext::new_for_testing(());
426 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
427 /// ReentrantByteCollectionView::load(context).await.unwrap();
428 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
429 /// assert!(view.contains_key(&[0, 1]).await.unwrap());
430 /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
431 /// # })
432 /// ```
433 pub async fn contains_key(&self, short_key: &[u8]) -> Result<bool, ViewError> {
434 Ok(if let Some(entry) = self.updates.get(short_key) {
435 match entry {
436 Update::Set(_view) => true,
437 Update::Removed => false,
438 }
439 } else if self.delete_storage_first {
440 false
441 } else if self.cached_entries.lock().unwrap().contains_key(short_key) {
442 true
443 } else {
444 let key_index = self
445 .context
446 .base_key()
447 .base_tag_index(KeyTag::Index as u8, short_key);
448 self.context.store().contains_key(&key_index).await?
449 })
450 }
451
452 /// Removes an entry. If absent then nothing happens.
453 /// ```rust
454 /// # tokio_test::block_on(async {
455 /// # use linera_views::context::MemoryContext;
456 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
457 /// # use linera_views::register_view::RegisterView;
458 /// # use linera_views::views::View;
459 /// # let context = MemoryContext::new_for_testing(());
460 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
461 /// ReentrantByteCollectionView::load(context).await.unwrap();
462 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
463 /// let value = subview.get_mut();
464 /// assert_eq!(*value, String::default());
465 /// view.remove_entry(vec![0, 1]);
466 /// let keys = view.keys().await.unwrap();
467 /// assert_eq!(keys.len(), 0);
468 /// # })
469 /// ```
470 pub fn remove_entry(&mut self, short_key: Vec<u8>) {
471 self.cached_entries.get_mut().unwrap().remove(&short_key);
472 if self.delete_storage_first {
473 // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
474 self.updates.remove(&short_key);
475 } else {
476 self.updates.insert(short_key, Update::Removed);
477 }
478 }
479
480 /// Marks the entry so that it is removed in the next flush.
481 /// ```rust
482 /// # tokio_test::block_on(async {
483 /// # use linera_views::context::MemoryContext;
484 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
485 /// # use linera_views::register_view::RegisterView;
486 /// # use linera_views::views::View;
487 /// # let context = MemoryContext::new_for_testing(());
488 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
489 /// ReentrantByteCollectionView::load(context).await.unwrap();
490 /// {
491 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
492 /// let value = subview.get_mut();
493 /// *value = String::from("Hello");
494 /// }
495 /// view.try_reset_entry_to_default(&[0, 1]).unwrap();
496 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
497 /// let value = subview.get_mut();
498 /// assert_eq!(*value, String::default());
499 /// # })
500 /// ```
501 pub fn try_reset_entry_to_default(&mut self, short_key: &[u8]) -> Result<(), ViewError> {
502 let key = self
503 .context
504 .base_key()
505 .base_tag_index(KeyTag::Subview as u8, short_key);
506 let context = self.context.clone_with_base_key(key);
507 let view = W::new(context)?;
508 let view = Arc::new(RwLock::new(view));
509 let view = Update::Set(view);
510 self.updates.insert(short_key.to_vec(), view);
511 self.cached_entries.get_mut().unwrap().remove(short_key);
512 Ok(())
513 }
514
515 /// Gets the extra data.
516 pub fn extra(&self) -> &<W::Context as Context>::Extra {
517 self.context.extra()
518 }
519}
520
521impl<W: View> ReentrantByteCollectionView<W::Context, W> {
522 /// Loads multiple entries for writing at once.
523 /// The entries in `short_keys` have to be all distinct.
524 /// ```rust
525 /// # tokio_test::block_on(async {
526 /// # use linera_views::context::MemoryContext;
527 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
528 /// # use linera_views::register_view::RegisterView;
529 /// # use linera_views::views::View;
530 /// # let context = MemoryContext::new_for_testing(());
531 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
532 /// ReentrantByteCollectionView::load(context).await.unwrap();
533 /// {
534 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
535 /// *subview.get_mut() = "Bonjour".to_string();
536 /// }
537 /// let short_keys = vec![vec![0, 1], vec![2, 3]];
538 /// let subviews = view.try_load_entries_mut(short_keys).await.unwrap();
539 /// let value1 = subviews[0].get();
540 /// let value2 = subviews[1].get();
541 /// assert_eq!(*value1, "Bonjour".to_string());
542 /// assert_eq!(*value2, String::default());
543 /// # })
544 /// ```
545 pub async fn try_load_entries_mut(
546 &mut self,
547 short_keys: Vec<Vec<u8>>,
548 ) -> Result<Vec<WriteGuardedView<W>>, ViewError> {
549 let cached_entries = self.cached_entries.get_mut().unwrap();
550 let mut short_keys_to_load = Vec::new();
551 let mut keys = Vec::new();
552 for short_key in &short_keys {
553 let key = self
554 .context
555 .base_key()
556 .base_tag_index(KeyTag::Subview as u8, short_key);
557 let context = self.context.clone_with_base_key(key);
558 match self.updates.entry(short_key.to_vec()) {
559 btree_map::Entry::Occupied(mut entry) => {
560 if let Update::Removed = entry.get() {
561 let view = W::new(context)?;
562 let view = Arc::new(RwLock::new(view));
563 entry.insert(Update::Set(view));
564 cached_entries.remove(short_key);
565 }
566 }
567 btree_map::Entry::Vacant(entry) => {
568 if self.delete_storage_first {
569 cached_entries.remove(short_key);
570 let view = W::new(context)?;
571 let view = Arc::new(RwLock::new(view));
572 entry.insert(Update::Set(view));
573 } else if let Some(view) = cached_entries.remove(short_key) {
574 entry.insert(Update::Set(view));
575 } else {
576 keys.extend(W::pre_load(&context)?);
577 short_keys_to_load.push(short_key.to_vec());
578 }
579 }
580 }
581 }
582 let values = self.context.store().read_multi_values_bytes(keys).await?;
583 for (loaded_values, short_key) in values
584 .chunks_exact(W::NUM_INIT_KEYS)
585 .zip(short_keys_to_load)
586 {
587 let key = self
588 .context
589 .base_key()
590 .base_tag_index(KeyTag::Subview as u8, &short_key);
591 let context = self.context.clone_with_base_key(key);
592 let view = W::post_load(context, loaded_values)?;
593 let wrapped_view = Arc::new(RwLock::new(view));
594 self.updates
595 .insert(short_key.to_vec(), Update::Set(wrapped_view));
596 }
597
598 short_keys
599 .into_iter()
600 .map(|short_key| {
601 let Some(Update::Set(view)) = self.updates.get(&short_key) else {
602 unreachable!()
603 };
604 Ok(WriteGuardedView(
605 view.clone()
606 .try_write_arc()
607 .ok_or_else(|| ViewError::TryLockError(short_key))?,
608 ))
609 })
610 .collect()
611 }
612
613 /// Loads multiple entries for reading at once.
614 /// The entries in `short_keys` have to be all distinct.
615 /// ```rust
616 /// # tokio_test::block_on(async {
617 /// # use linera_views::context::MemoryContext;
618 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
619 /// # use linera_views::register_view::RegisterView;
620 /// # use linera_views::views::View;
621 /// # let context = MemoryContext::new_for_testing(());
622 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
623 /// ReentrantByteCollectionView::load(context).await.unwrap();
624 /// {
625 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
626 /// }
627 /// let short_keys = vec![vec![0, 1], vec![2, 3]];
628 /// let subviews = view.try_load_entries(short_keys).await.unwrap();
629 /// assert!(subviews[1].is_none());
630 /// let value0 = subviews[0].as_ref().unwrap().get();
631 /// assert_eq!(*value0, String::default());
632 /// # })
633 /// ```
634 pub async fn try_load_entries(
635 &self,
636 short_keys: Vec<Vec<u8>>,
637 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError> {
638 let mut results = vec![None; short_keys.len()];
639 let mut keys_to_check = Vec::new();
640 let mut keys_to_check_metadata = Vec::new();
641
642 {
643 let cached_entries = self.cached_entries.lock().unwrap();
644 for (position, short_key) in short_keys.into_iter().enumerate() {
645 if let Some(update) = self.updates.get(&short_key) {
646 if let Update::Set(view) = update {
647 results[position] = Some((short_key, view.clone()));
648 }
649 } else if let Some(view) = cached_entries.get(&short_key) {
650 results[position] = Some((short_key, view.clone()));
651 } else if !self.delete_storage_first {
652 let key_index = self
653 .context
654 .base_key()
655 .base_tag_index(KeyTag::Index as u8, &short_key);
656 keys_to_check.push(key_index);
657 keys_to_check_metadata.push((position, short_key));
658 }
659 }
660 }
661
662 let found_keys = self.context.store().contains_keys(keys_to_check).await?;
663 let entries_to_load = keys_to_check_metadata
664 .into_iter()
665 .zip(found_keys)
666 .filter_map(|(metadata, found)| found.then_some(metadata))
667 .map(|(position, short_key)| {
668 let subview_key = self
669 .context
670 .base_key()
671 .base_tag_index(KeyTag::Subview as u8, &short_key);
672 let subview_context = self.context.clone_with_base_key(subview_key);
673 (position, short_key.to_owned(), subview_context)
674 })
675 .collect::<Vec<_>>();
676 if !entries_to_load.is_empty() {
677 let mut keys_to_load = Vec::with_capacity(entries_to_load.len() * W::NUM_INIT_KEYS);
678 for (_, _, context) in &entries_to_load {
679 keys_to_load.extend(W::pre_load(context)?);
680 }
681 let values = self
682 .context
683 .store()
684 .read_multi_values_bytes(keys_to_load)
685 .await?;
686 let mut cached_entries = self.cached_entries.lock().unwrap();
687 for (loaded_values, (position, short_key, context)) in
688 values.chunks_exact(W::NUM_INIT_KEYS).zip(entries_to_load)
689 {
690 let view = W::post_load(context, loaded_values)?;
691 let wrapped_view = Arc::new(RwLock::new(view));
692 cached_entries.insert(short_key.clone(), wrapped_view.clone());
693 results[position] = Some((short_key, wrapped_view));
694 }
695 }
696
697 results
698 .into_iter()
699 .map(|maybe_view| match maybe_view {
700 Some((short_key, view)) => Ok(Some(ReadGuardedView(
701 view.try_read_arc()
702 .ok_or_else(|| ViewError::TryLockError(short_key))?,
703 ))),
704 None => Ok(None),
705 })
706 .collect()
707 }
708
709 /// Loads all the entries for reading at once.
710 /// ```rust
711 /// # tokio_test::block_on(async {
712 /// # use linera_views::context::MemoryContext;
713 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
714 /// # use linera_views::register_view::RegisterView;
715 /// # use linera_views::views::View;
716 /// # let context = MemoryContext::new_for_testing(());
717 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
718 /// ReentrantByteCollectionView::load(context).await.unwrap();
719 /// {
720 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
721 /// }
722 /// let subviews = view.try_load_all_entries().await.unwrap();
723 /// assert_eq!(subviews.len(), 1);
724 /// # })
725 /// ```
726 pub async fn try_load_all_entries(
727 &self,
728 ) -> Result<Vec<(Vec<u8>, ReadGuardedView<W>)>, ViewError> {
729 let short_keys = self.keys().await?;
730 if !self.delete_storage_first {
731 let mut keys = Vec::new();
732 let mut short_keys_to_load = Vec::new();
733 {
734 let cached_entries = self.cached_entries.lock().unwrap();
735 for short_key in &short_keys {
736 if !self.updates.contains_key(short_key)
737 && !cached_entries.contains_key(short_key)
738 {
739 let key = self
740 .context
741 .base_key()
742 .base_tag_index(KeyTag::Subview as u8, short_key);
743 let context = self.context.clone_with_base_key(key);
744 keys.extend(W::pre_load(&context)?);
745 short_keys_to_load.push(short_key.to_vec());
746 }
747 }
748 }
749 let values = self.context.store().read_multi_values_bytes(keys).await?;
750 {
751 let mut cached_entries = self.cached_entries.lock().unwrap();
752 for (loaded_values, short_key) in values
753 .chunks_exact(W::NUM_INIT_KEYS)
754 .zip(short_keys_to_load)
755 {
756 let key = self
757 .context
758 .base_key()
759 .base_tag_index(KeyTag::Subview as u8, &short_key);
760 let context = self.context.clone_with_base_key(key);
761 let view = W::post_load(context, loaded_values)?;
762 let wrapped_view = Arc::new(RwLock::new(view));
763 cached_entries.insert(short_key.to_vec(), wrapped_view);
764 }
765 }
766 }
767 let cached_entries = self.cached_entries.lock().unwrap();
768 short_keys
769 .into_iter()
770 .map(|short_key| {
771 let view = if let Some(Update::Set(view)) = self.updates.get(&short_key) {
772 view.clone()
773 } else if let Some(view) = cached_entries.get(&short_key) {
774 view.clone()
775 } else {
776 unreachable!("All entries should have been loaded into memory");
777 };
778 let guard = ReadGuardedView(
779 view.try_read_arc()
780 .ok_or_else(|| ViewError::TryLockError(short_key.clone()))?,
781 );
782 Ok((short_key, guard))
783 })
784 .collect()
785 }
786
787 /// Loads all the entries for writing at once.
788 /// ```rust
789 /// # tokio_test::block_on(async {
790 /// # use linera_views::context::MemoryContext;
791 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
792 /// # use linera_views::register_view::RegisterView;
793 /// # use linera_views::views::View;
794 /// # let context = MemoryContext::new_for_testing(());
795 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
796 /// ReentrantByteCollectionView::load(context).await.unwrap();
797 /// {
798 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
799 /// }
800 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
801 /// assert_eq!(subviews.len(), 1);
802 /// # })
803 /// ```
804 pub async fn try_load_all_entries_mut(
805 &mut self,
806 ) -> Result<Vec<(Vec<u8>, WriteGuardedView<W>)>, ViewError> {
807 let short_keys = self.keys().await?;
808 if !self.delete_storage_first {
809 let mut keys = Vec::new();
810 let mut short_keys_to_load = Vec::new();
811 {
812 let cached_entries = self.cached_entries.get_mut().unwrap();
813 for short_key in &short_keys {
814 if !self.updates.contains_key(short_key) {
815 if let Some(view) = cached_entries.remove(short_key) {
816 self.updates.insert(short_key.to_vec(), Update::Set(view));
817 } else {
818 let key = self
819 .context
820 .base_key()
821 .base_tag_index(KeyTag::Subview as u8, short_key);
822 let context = self.context.clone_with_base_key(key);
823 keys.extend(W::pre_load(&context)?);
824 short_keys_to_load.push(short_key.to_vec());
825 }
826 }
827 }
828 }
829 let values = self.context.store().read_multi_values_bytes(keys).await?;
830 for (loaded_values, short_key) in values
831 .chunks_exact(W::NUM_INIT_KEYS)
832 .zip(short_keys_to_load)
833 {
834 let key = self
835 .context
836 .base_key()
837 .base_tag_index(KeyTag::Subview as u8, &short_key);
838 let context = self.context.clone_with_base_key(key);
839 let view = W::post_load(context, loaded_values)?;
840 let wrapped_view = Arc::new(RwLock::new(view));
841 self.updates
842 .insert(short_key.to_vec(), Update::Set(wrapped_view));
843 }
844 }
845 short_keys
846 .into_iter()
847 .map(|short_key| {
848 let Some(Update::Set(view)) = self.updates.get(&short_key) else {
849 unreachable!("All entries should have been loaded into `updates`")
850 };
851 let guard = WriteGuardedView(
852 view.clone()
853 .try_write_arc()
854 .ok_or_else(|| ViewError::TryLockError(short_key.clone()))?,
855 );
856 Ok((short_key, guard))
857 })
858 .collect()
859 }
860}
861
862impl<W: View> ReentrantByteCollectionView<W::Context, W> {
863 /// Returns the list of indices in the collection in lexicographic order.
864 /// ```rust
865 /// # tokio_test::block_on(async {
866 /// # use linera_views::context::MemoryContext;
867 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
868 /// # use linera_views::register_view::RegisterView;
869 /// # use linera_views::views::View;
870 /// # let context = MemoryContext::new_for_testing(());
871 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
872 /// ReentrantByteCollectionView::load(context).await.unwrap();
873 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
874 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
875 /// let keys = view.keys().await.unwrap();
876 /// assert_eq!(keys, vec![vec![0, 1], vec![0, 2]]);
877 /// # })
878 /// ```
879 pub async fn keys(&self) -> Result<Vec<Vec<u8>>, ViewError> {
880 let mut keys = Vec::new();
881 self.for_each_key(|key| {
882 keys.push(key.to_vec());
883 Ok(())
884 })
885 .await?;
886 Ok(keys)
887 }
888
889 /// Returns the number of indices of the collection.
890 /// ```rust
891 /// # tokio_test::block_on(async {
892 /// # use linera_views::context::MemoryContext;
893 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
894 /// # use linera_views::register_view::RegisterView;
895 /// # use linera_views::views::View;
896 /// # let context = MemoryContext::new_for_testing(());
897 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
898 /// ReentrantByteCollectionView::load(context).await.unwrap();
899 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
900 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
901 /// assert_eq!(view.count().await.unwrap(), 2);
902 /// # })
903 /// ```
904 pub async fn count(&self) -> Result<usize, ViewError> {
905 let mut count = 0;
906 self.for_each_key(|_key| {
907 count += 1;
908 Ok(())
909 })
910 .await?;
911 Ok(count)
912 }
913
914 /// Applies a function f on each index (aka key). Keys are visited in a
915 /// lexicographic order. If the function returns false then the loop
916 /// ends prematurely.
917 /// ```rust
918 /// # tokio_test::block_on(async {
919 /// # use linera_views::context::MemoryContext;
920 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
921 /// # use linera_views::register_view::RegisterView;
922 /// # use linera_views::views::View;
923 /// # let context = MemoryContext::new_for_testing(());
924 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
925 /// ReentrantByteCollectionView::load(context).await.unwrap();
926 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
927 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
928 /// let mut count = 0;
929 /// view.for_each_key_while(|_key| {
930 /// count += 1;
931 /// Ok(count < 1)
932 /// })
933 /// .await
934 /// .unwrap();
935 /// assert_eq!(count, 1);
936 /// # })
937 /// ```
938 pub async fn for_each_key_while<F>(&self, mut f: F) -> Result<(), ViewError>
939 where
940 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
941 {
942 let mut updates = self.updates.iter();
943 let mut update = updates.next();
944 if !self.delete_storage_first {
945 let base = self.get_index_key(&[]);
946 for index in self.context.store().find_keys_by_prefix(&base).await? {
947 loop {
948 match update {
949 Some((key, value)) if key <= &index => {
950 if let Update::Set(_) = value {
951 if !f(key)? {
952 return Ok(());
953 }
954 }
955 update = updates.next();
956 if key == &index {
957 break;
958 }
959 }
960 _ => {
961 if !f(&index)? {
962 return Ok(());
963 }
964 break;
965 }
966 }
967 }
968 }
969 }
970 while let Some((key, value)) = update {
971 if let Update::Set(_) = value {
972 if !f(key)? {
973 return Ok(());
974 }
975 }
976 update = updates.next();
977 }
978 Ok(())
979 }
980
981 /// Applies a function f on each index (aka key). Keys are visited in a
982 /// lexicographic order.
983 /// ```rust
984 /// # tokio_test::block_on(async {
985 /// # use linera_views::context::MemoryContext;
986 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
987 /// # use linera_views::register_view::RegisterView;
988 /// # use linera_views::views::View;
989 /// # let context = MemoryContext::new_for_testing(());
990 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
991 /// ReentrantByteCollectionView::load(context).await.unwrap();
992 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
993 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
994 /// let mut count = 0;
995 /// view.for_each_key(|_key| {
996 /// count += 1;
997 /// Ok(())
998 /// })
999 /// .await
1000 /// .unwrap();
1001 /// assert_eq!(count, 2);
1002 /// # })
1003 /// ```
1004 pub async fn for_each_key<F>(&self, mut f: F) -> Result<(), ViewError>
1005 where
1006 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
1007 {
1008 self.for_each_key_while(|key| {
1009 f(key)?;
1010 Ok(true)
1011 })
1012 .await
1013 }
1014}
1015
1016impl<W: HashableView> HashableView for ReentrantByteCollectionView<W::Context, W> {
1017 type Hasher = sha3::Sha3_256;
1018
1019 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1020 #[cfg(with_metrics)]
1021 let _hash_latency = metrics::REENTRANT_COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
1022 let mut hasher = sha3::Sha3_256::default();
1023 let keys = self.keys().await?;
1024 let count = keys.len() as u32;
1025 hasher.update_with_bcs_bytes(&count)?;
1026 let cached_entries = self.cached_entries.get_mut().unwrap();
1027 for key in keys {
1028 hasher.update_with_bytes(&key)?;
1029 let hash = if let Some(entry) = self.updates.get_mut(&key) {
1030 let Update::Set(view) = entry else {
1031 unreachable!();
1032 };
1033 let mut view = view
1034 .try_write_arc()
1035 .ok_or_else(|| ViewError::TryLockError(key))?;
1036 view.hash_mut().await?
1037 } else if let Some(view) = cached_entries.get_mut(&key) {
1038 let mut view = view
1039 .try_write_arc()
1040 .ok_or_else(|| ViewError::TryLockError(key))?;
1041 view.hash_mut().await?
1042 } else {
1043 let key = self
1044 .context
1045 .base_key()
1046 .base_tag_index(KeyTag::Subview as u8, &key);
1047 let context = self.context.clone_with_base_key(key);
1048 let mut view = W::load(context).await?;
1049 view.hash_mut().await?
1050 };
1051 hasher.write_all(hash.as_ref())?;
1052 }
1053 Ok(hasher.finalize())
1054 }
1055
1056 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1057 #[cfg(with_metrics)]
1058 let _hash_latency = metrics::REENTRANT_COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
1059 let mut hasher = sha3::Sha3_256::default();
1060 let keys = self.keys().await?;
1061 let count = keys.len() as u32;
1062 hasher.update_with_bcs_bytes(&count)?;
1063 let mut cached_entries_result = Vec::new();
1064 {
1065 let cached_entries = self.cached_entries.lock().unwrap();
1066 for key in &keys {
1067 cached_entries_result.push(cached_entries.get(key).cloned());
1068 }
1069 }
1070 for (key, cached_entry) in keys.into_iter().zip(cached_entries_result) {
1071 hasher.update_with_bytes(&key)?;
1072 let hash = if let Some(entry) = self.updates.get(&key) {
1073 let Update::Set(view) = entry else {
1074 unreachable!();
1075 };
1076 let view = view
1077 .try_read_arc()
1078 .ok_or_else(|| ViewError::TryLockError(key))?;
1079 view.hash().await?
1080 } else if let Some(view) = cached_entry {
1081 let view = view
1082 .try_read_arc()
1083 .ok_or_else(|| ViewError::TryLockError(key))?;
1084 view.hash().await?
1085 } else {
1086 let key = self
1087 .context
1088 .base_key()
1089 .base_tag_index(KeyTag::Subview as u8, &key);
1090 let context = self.context.clone_with_base_key(key);
1091 let view = W::load(context).await?;
1092 view.hash().await?
1093 };
1094 hasher.write_all(hash.as_ref())?;
1095 }
1096 Ok(hasher.finalize())
1097 }
1098}
1099
1100/// A view that supports accessing a collection of views of the same kind, indexed by keys,
1101/// possibly several subviews at a time.
1102#[derive(Debug)]
1103pub struct ReentrantCollectionView<C, I, W> {
1104 collection: ReentrantByteCollectionView<C, W>,
1105 _phantom: PhantomData<I>,
1106}
1107
1108impl<I, W, C2> ReplaceContext<C2> for ReentrantCollectionView<W::Context, I, W>
1109where
1110 W: View + ReplaceContext<C2>,
1111 I: Send + Sync + Serialize + DeserializeOwned,
1112 C2: Context,
1113{
1114 type Target = ReentrantCollectionView<C2, I, <W as ReplaceContext<C2>>::Target>;
1115
1116 async fn with_context(
1117 &mut self,
1118 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
1119 ) -> Self::Target {
1120 ReentrantCollectionView {
1121 collection: self.collection.with_context(ctx).await,
1122 _phantom: self._phantom,
1123 }
1124 }
1125}
1126
1127impl<I, W> View for ReentrantCollectionView<W::Context, I, W>
1128where
1129 W: View,
1130 I: Send + Sync + Serialize + DeserializeOwned,
1131{
1132 const NUM_INIT_KEYS: usize = ReentrantByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
1133
1134 type Context = W::Context;
1135
1136 fn context(&self) -> &Self::Context {
1137 self.collection.context()
1138 }
1139
1140 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
1141 ReentrantByteCollectionView::<W::Context, W>::pre_load(context)
1142 }
1143
1144 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
1145 let collection = ReentrantByteCollectionView::post_load(context, values)?;
1146 Ok(ReentrantCollectionView {
1147 collection,
1148 _phantom: PhantomData,
1149 })
1150 }
1151
1152 async fn load(context: Self::Context) -> Result<Self, ViewError> {
1153 Self::post_load(context, &[])
1154 }
1155
1156 fn rollback(&mut self) {
1157 self.collection.rollback()
1158 }
1159
1160 async fn has_pending_changes(&self) -> bool {
1161 self.collection.has_pending_changes().await
1162 }
1163
1164 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
1165 self.collection.flush(batch)
1166 }
1167
1168 fn clear(&mut self) {
1169 self.collection.clear()
1170 }
1171}
1172
1173impl<I, W> ClonableView for ReentrantCollectionView<W::Context, I, W>
1174where
1175 W: ClonableView,
1176 I: Send + Sync + Serialize + DeserializeOwned,
1177{
1178 fn clone_unchecked(&mut self) -> Self {
1179 ReentrantCollectionView {
1180 collection: self.collection.clone_unchecked(),
1181 _phantom: PhantomData,
1182 }
1183 }
1184}
1185
1186impl<I, W> ReentrantCollectionView<W::Context, I, W>
1187where
1188 W: View,
1189 I: Sync + Clone + Send + Serialize + DeserializeOwned,
1190{
1191 /// Loads a subview for the data at the given index in the collection. If an entry
1192 /// is absent then a default entry is put on the collection. The obtained view can
1193 /// then be modified.
1194 /// ```rust
1195 /// # tokio_test::block_on(async {
1196 /// # use linera_views::context::MemoryContext;
1197 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1198 /// # use linera_views::register_view::RegisterView;
1199 /// # use linera_views::views::View;
1200 /// # let context = MemoryContext::new_for_testing(());
1201 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1202 /// ReentrantCollectionView::load(context).await.unwrap();
1203 /// let subview = view.try_load_entry_mut(&23).await.unwrap();
1204 /// let value = subview.get();
1205 /// assert_eq!(*value, String::default());
1206 /// # })
1207 /// ```
1208 pub async fn try_load_entry_mut<Q>(
1209 &mut self,
1210 index: &Q,
1211 ) -> Result<WriteGuardedView<W>, ViewError>
1212 where
1213 I: Borrow<Q>,
1214 Q: Serialize + ?Sized,
1215 {
1216 let short_key = BaseKey::derive_short_key(index)?;
1217 self.collection.try_load_entry_mut(&short_key).await
1218 }
1219
1220 /// Loads a subview at the given index in the collection and gives read-only access to the data.
1221 /// If an entry is absent then `None` is returned.
1222 /// ```rust
1223 /// # tokio_test::block_on(async {
1224 /// # use linera_views::context::MemoryContext;
1225 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1226 /// # use linera_views::register_view::RegisterView;
1227 /// # use linera_views::views::View;
1228 /// # let context = MemoryContext::new_for_testing(());
1229 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1230 /// ReentrantCollectionView::load(context).await.unwrap();
1231 /// {
1232 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1233 /// }
1234 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1235 /// let value = subview.get();
1236 /// assert_eq!(*value, String::default());
1237 /// # })
1238 /// ```
1239 pub async fn try_load_entry<Q>(
1240 &self,
1241 index: &Q,
1242 ) -> Result<Option<ReadGuardedView<W>>, ViewError>
1243 where
1244 I: Borrow<Q>,
1245 Q: Serialize + ?Sized,
1246 {
1247 let short_key = BaseKey::derive_short_key(index)?;
1248 self.collection.try_load_entry(&short_key).await
1249 }
1250
1251 /// Returns `true` if the collection contains a value for the specified key.
1252 /// ```rust
1253 /// # tokio_test::block_on(async {
1254 /// # use linera_views::context::MemoryContext;
1255 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1256 /// # use linera_views::register_view::RegisterView;
1257 /// # use linera_views::views::View;
1258 /// # let context = MemoryContext::new_for_testing(());
1259 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1260 /// ReentrantCollectionView::load(context).await.unwrap();
1261 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1262 /// assert!(view.contains_key(&23).await.unwrap());
1263 /// assert!(!view.contains_key(&24).await.unwrap());
1264 /// # })
1265 /// ```
1266 pub async fn contains_key<Q>(&self, index: &Q) -> Result<bool, ViewError>
1267 where
1268 I: Borrow<Q>,
1269 Q: Serialize + ?Sized,
1270 {
1271 let short_key = BaseKey::derive_short_key(index)?;
1272 self.collection.contains_key(&short_key).await
1273 }
1274
1275 /// Marks the entry so that it is removed in the next flush.
1276 /// ```rust
1277 /// # tokio_test::block_on(async {
1278 /// # use linera_views::context::MemoryContext;
1279 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1280 /// # use linera_views::register_view::RegisterView;
1281 /// # use linera_views::views::View;
1282 /// # let context = MemoryContext::new_for_testing(());
1283 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1284 /// ReentrantCollectionView::load(context).await.unwrap();
1285 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1286 /// let value = subview.get_mut();
1287 /// assert_eq!(*value, String::default());
1288 /// view.remove_entry(&23);
1289 /// let keys = view.indices().await.unwrap();
1290 /// assert_eq!(keys.len(), 0);
1291 /// # })
1292 /// ```
1293 pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1294 where
1295 I: Borrow<Q>,
1296 Q: Serialize + ?Sized,
1297 {
1298 let short_key = BaseKey::derive_short_key(index)?;
1299 self.collection.remove_entry(short_key);
1300 Ok(())
1301 }
1302
1303 /// Marks the entry so that it is removed in the next flush.
1304 /// ```rust
1305 /// # tokio_test::block_on(async {
1306 /// # use linera_views::context::MemoryContext;
1307 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1308 /// # use linera_views::register_view::RegisterView;
1309 /// # use linera_views::views::View;
1310 /// # let context = MemoryContext::new_for_testing(());
1311 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1312 /// ReentrantCollectionView::load(context).await.unwrap();
1313 /// {
1314 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1315 /// let value = subview.get_mut();
1316 /// *value = String::from("Hello");
1317 /// }
1318 /// view.try_reset_entry_to_default(&23).unwrap();
1319 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1320 /// let value = subview.get_mut();
1321 /// assert_eq!(*value, String::default());
1322 /// # })
1323 /// ```
1324 pub fn try_reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1325 where
1326 I: Borrow<Q>,
1327 Q: Serialize + ?Sized,
1328 {
1329 let short_key = BaseKey::derive_short_key(index)?;
1330 self.collection.try_reset_entry_to_default(&short_key)
1331 }
1332
1333 /// Gets the extra data.
1334 pub fn extra(&self) -> &<W::Context as Context>::Extra {
1335 self.collection.extra()
1336 }
1337}
1338
1339impl<I, W> ReentrantCollectionView<W::Context, I, W>
1340where
1341 W: View,
1342 I: Sync + Clone + Send + Serialize + DeserializeOwned,
1343{
1344 /// Load multiple entries for writing at once.
1345 /// The entries in indices have to be all distinct.
1346 /// ```rust
1347 /// # tokio_test::block_on(async {
1348 /// # use linera_views::context::MemoryContext;
1349 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1350 /// # use linera_views::register_view::RegisterView;
1351 /// # use linera_views::views::View;
1352 /// # let context = MemoryContext::new_for_testing(());
1353 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1354 /// ReentrantCollectionView::load(context).await.unwrap();
1355 /// let indices = vec![23, 42];
1356 /// let subviews = view.try_load_entries_mut(&indices).await.unwrap();
1357 /// let value1 = subviews[0].get();
1358 /// let value2 = subviews[1].get();
1359 /// assert_eq!(*value1, String::default());
1360 /// assert_eq!(*value2, String::default());
1361 /// # })
1362 /// ```
1363 pub async fn try_load_entries_mut<'a, Q>(
1364 &'a mut self,
1365 indices: impl IntoIterator<Item = &'a Q>,
1366 ) -> Result<Vec<WriteGuardedView<W>>, ViewError>
1367 where
1368 I: Borrow<Q>,
1369 Q: Serialize + 'a,
1370 {
1371 let short_keys = indices
1372 .into_iter()
1373 .map(|index| BaseKey::derive_short_key(index))
1374 .collect::<Result<_, _>>()?;
1375 self.collection.try_load_entries_mut(short_keys).await
1376 }
1377
1378 /// Load multiple entries for reading at once.
1379 /// The entries in indices have to be all distinct.
1380 /// ```rust
1381 /// # tokio_test::block_on(async {
1382 /// # use linera_views::context::MemoryContext;
1383 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1384 /// # use linera_views::register_view::RegisterView;
1385 /// # use linera_views::views::View;
1386 /// # let context = MemoryContext::new_for_testing(());
1387 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1388 /// ReentrantCollectionView::load(context).await.unwrap();
1389 /// {
1390 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1391 /// }
1392 /// let indices = vec![23, 42];
1393 /// let subviews = view.try_load_entries(&indices).await.unwrap();
1394 /// assert!(subviews[1].is_none());
1395 /// let value0 = subviews[0].as_ref().unwrap().get();
1396 /// assert_eq!(*value0, String::default());
1397 /// # })
1398 /// ```
1399 pub async fn try_load_entries<'a, Q>(
1400 &'a self,
1401 indices: impl IntoIterator<Item = &'a Q>,
1402 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1403 where
1404 I: Borrow<Q>,
1405 Q: Serialize + 'a,
1406 {
1407 let short_keys = indices
1408 .into_iter()
1409 .map(|index| BaseKey::derive_short_key(index))
1410 .collect::<Result<_, _>>()?;
1411 self.collection.try_load_entries(short_keys).await
1412 }
1413
1414 /// Loads all entries for writing at once.
1415 /// The entries in indices have to be all distinct.
1416 /// ```rust
1417 /// # tokio_test::block_on(async {
1418 /// # use linera_views::context::MemoryContext;
1419 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1420 /// # use linera_views::register_view::RegisterView;
1421 /// # use linera_views::views::View;
1422 /// # let context = MemoryContext::new_for_testing(());
1423 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1424 /// ReentrantCollectionView::load(context).await.unwrap();
1425 /// {
1426 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1427 /// }
1428 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
1429 /// assert_eq!(subviews.len(), 1);
1430 /// # })
1431 /// ```
1432 pub async fn try_load_all_entries_mut<'a, Q>(
1433 &'a mut self,
1434 ) -> Result<Vec<(I, WriteGuardedView<W>)>, ViewError>
1435 where
1436 I: Borrow<Q>,
1437 Q: Serialize + 'a,
1438 {
1439 let results = self.collection.try_load_all_entries_mut().await?;
1440 results
1441 .into_iter()
1442 .map(|(short_key, view)| {
1443 let index = BaseKey::deserialize_value(&short_key)?;
1444 Ok((index, view))
1445 })
1446 .collect()
1447 }
1448
1449 /// Load multiple entries for reading at once.
1450 /// The entries in indices have to be all distinct.
1451 /// ```rust
1452 /// # tokio_test::block_on(async {
1453 /// # use linera_views::context::MemoryContext;
1454 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1455 /// # use linera_views::register_view::RegisterView;
1456 /// # use linera_views::views::View;
1457 /// # let context = MemoryContext::new_for_testing(());
1458 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1459 /// ReentrantCollectionView::load(context).await.unwrap();
1460 /// {
1461 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1462 /// }
1463 /// let subviews = view.try_load_all_entries().await.unwrap();
1464 /// assert_eq!(subviews.len(), 1);
1465 /// # })
1466 /// ```
1467 pub async fn try_load_all_entries<'a, Q>(
1468 &'a self,
1469 ) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError>
1470 where
1471 I: Borrow<Q>,
1472 Q: Serialize + 'a,
1473 {
1474 let results = self.collection.try_load_all_entries().await?;
1475 results
1476 .into_iter()
1477 .map(|(short_key, view)| {
1478 let index = BaseKey::deserialize_value(&short_key)?;
1479 Ok((index, view))
1480 })
1481 .collect()
1482 }
1483}
1484
1485impl<I, W> ReentrantCollectionView<W::Context, I, W>
1486where
1487 W: View,
1488 I: Sync + Clone + Send + Serialize + DeserializeOwned,
1489{
1490 /// Returns the list of indices in the collection in an order determined
1491 /// by serialization.
1492 /// ```rust
1493 /// # tokio_test::block_on(async {
1494 /// # use linera_views::context::MemoryContext;
1495 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1496 /// # use linera_views::register_view::RegisterView;
1497 /// # use linera_views::views::View;
1498 /// # let context = MemoryContext::new_for_testing(());
1499 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1500 /// ReentrantCollectionView::load(context).await.unwrap();
1501 /// view.try_load_entry_mut(&23).await.unwrap();
1502 /// view.try_load_entry_mut(&25).await.unwrap();
1503 /// let indices = view.indices().await.unwrap();
1504 /// assert_eq!(indices.len(), 2);
1505 /// # })
1506 /// ```
1507 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
1508 let mut indices = Vec::new();
1509 self.for_each_index(|index| {
1510 indices.push(index);
1511 Ok(())
1512 })
1513 .await?;
1514 Ok(indices)
1515 }
1516
1517 /// Returns the number of indices in the collection.
1518 /// ```rust
1519 /// # tokio_test::block_on(async {
1520 /// # use linera_views::context::MemoryContext;
1521 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1522 /// # use linera_views::register_view::RegisterView;
1523 /// # use linera_views::views::View;
1524 /// # let context = MemoryContext::new_for_testing(());
1525 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1526 /// ReentrantCollectionView::load(context).await.unwrap();
1527 /// view.try_load_entry_mut(&23).await.unwrap();
1528 /// view.try_load_entry_mut(&25).await.unwrap();
1529 /// assert_eq!(view.count().await.unwrap(), 2);
1530 /// # })
1531 /// ```
1532 pub async fn count(&self) -> Result<usize, ViewError> {
1533 self.collection.count().await
1534 }
1535
1536 /// Applies a function f on each index. Indices are visited in an order
1537 /// determined by the serialization. If the function f returns false then
1538 /// the loop ends prematurely.
1539 /// ```rust
1540 /// # tokio_test::block_on(async {
1541 /// # use linera_views::context::MemoryContext;
1542 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1543 /// # use linera_views::register_view::RegisterView;
1544 /// # use linera_views::views::View;
1545 /// # let context = MemoryContext::new_for_testing(());
1546 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1547 /// ReentrantCollectionView::load(context).await.unwrap();
1548 /// view.try_load_entry_mut(&23).await.unwrap();
1549 /// view.try_load_entry_mut(&24).await.unwrap();
1550 /// let mut count = 0;
1551 /// view.for_each_index_while(|_key| {
1552 /// count += 1;
1553 /// Ok(count < 1)
1554 /// })
1555 /// .await
1556 /// .unwrap();
1557 /// assert_eq!(count, 1);
1558 /// # })
1559 /// ```
1560 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
1561 where
1562 F: FnMut(I) -> Result<bool, ViewError> + Send,
1563 {
1564 self.collection
1565 .for_each_key_while(|key| {
1566 let index = BaseKey::deserialize_value(key)?;
1567 f(index)
1568 })
1569 .await?;
1570 Ok(())
1571 }
1572
1573 /// Applies a function f on each index. Indices are visited in an order
1574 /// determined by the serialization.
1575 /// ```rust
1576 /// # tokio_test::block_on(async {
1577 /// # use linera_views::context::MemoryContext;
1578 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1579 /// # use linera_views::register_view::RegisterView;
1580 /// # use linera_views::views::View;
1581 /// # let context = MemoryContext::new_for_testing(());
1582 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1583 /// ReentrantCollectionView::load(context).await.unwrap();
1584 /// view.try_load_entry_mut(&23).await.unwrap();
1585 /// view.try_load_entry_mut(&28).await.unwrap();
1586 /// let mut count = 0;
1587 /// view.for_each_index(|_key| {
1588 /// count += 1;
1589 /// Ok(())
1590 /// })
1591 /// .await
1592 /// .unwrap();
1593 /// assert_eq!(count, 2);
1594 /// # })
1595 /// ```
1596 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
1597 where
1598 F: FnMut(I) -> Result<(), ViewError> + Send,
1599 {
1600 self.collection
1601 .for_each_key(|key| {
1602 let index = BaseKey::deserialize_value(key)?;
1603 f(index)
1604 })
1605 .await?;
1606 Ok(())
1607 }
1608}
1609
1610impl<I, W> HashableView for ReentrantCollectionView<W::Context, I, W>
1611where
1612 W: HashableView,
1613 I: Send + Sync + Serialize + DeserializeOwned,
1614{
1615 type Hasher = sha3::Sha3_256;
1616
1617 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1618 self.collection.hash_mut().await
1619 }
1620
1621 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1622 self.collection.hash().await
1623 }
1624}
1625
1626/// A view that supports accessing a collection of views of the same kind, indexed by an ordered key,
1627/// possibly several subviews at a time.
1628#[derive(Debug)]
1629pub struct ReentrantCustomCollectionView<C, I, W> {
1630 collection: ReentrantByteCollectionView<C, W>,
1631 _phantom: PhantomData<I>,
1632}
1633
1634impl<I, W> View for ReentrantCustomCollectionView<W::Context, I, W>
1635where
1636 W: View,
1637 I: Send + Sync + CustomSerialize,
1638{
1639 const NUM_INIT_KEYS: usize = ReentrantByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
1640
1641 type Context = W::Context;
1642
1643 fn context(&self) -> &Self::Context {
1644 self.collection.context()
1645 }
1646
1647 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
1648 ReentrantByteCollectionView::<_, W>::pre_load(context)
1649 }
1650
1651 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
1652 let collection = ReentrantByteCollectionView::post_load(context, values)?;
1653 Ok(ReentrantCustomCollectionView {
1654 collection,
1655 _phantom: PhantomData,
1656 })
1657 }
1658
1659 async fn load(context: Self::Context) -> Result<Self, ViewError> {
1660 Self::post_load(context, &[])
1661 }
1662
1663 fn rollback(&mut self) {
1664 self.collection.rollback()
1665 }
1666
1667 async fn has_pending_changes(&self) -> bool {
1668 self.collection.has_pending_changes().await
1669 }
1670
1671 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
1672 self.collection.flush(batch)
1673 }
1674
1675 fn clear(&mut self) {
1676 self.collection.clear()
1677 }
1678}
1679
1680impl<I, W> ClonableView for ReentrantCustomCollectionView<W::Context, I, W>
1681where
1682 W: ClonableView,
1683 Self: View,
1684{
1685 fn clone_unchecked(&mut self) -> Self {
1686 ReentrantCustomCollectionView {
1687 collection: self.collection.clone_unchecked(),
1688 _phantom: PhantomData,
1689 }
1690 }
1691}
1692
1693impl<I, W> ReentrantCustomCollectionView<W::Context, I, W>
1694where
1695 W: View,
1696 I: Sync + Clone + Send + CustomSerialize,
1697{
1698 /// Loads a subview for the data at the given index in the collection. If an entry
1699 /// is absent then a default entry is put in the collection on this index.
1700 /// ```rust
1701 /// # tokio_test::block_on(async {
1702 /// # use linera_views::context::MemoryContext;
1703 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1704 /// # use linera_views::register_view::RegisterView;
1705 /// # use linera_views::views::View;
1706 /// # let context = MemoryContext::new_for_testing(());
1707 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1708 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1709 /// let subview = view.try_load_entry_mut(&23).await.unwrap();
1710 /// let value = subview.get();
1711 /// assert_eq!(*value, String::default());
1712 /// # })
1713 /// ```
1714 pub async fn try_load_entry_mut<Q>(
1715 &mut self,
1716 index: &Q,
1717 ) -> Result<WriteGuardedView<W>, ViewError>
1718 where
1719 I: Borrow<Q>,
1720 Q: CustomSerialize,
1721 {
1722 let short_key = index.to_custom_bytes()?;
1723 self.collection.try_load_entry_mut(&short_key).await
1724 }
1725
1726 /// Loads a subview at the given index in the collection and gives read-only access to the data.
1727 /// If an entry is absent then `None` is returned.
1728 /// ```rust
1729 /// # tokio_test::block_on(async {
1730 /// # use linera_views::context::MemoryContext;
1731 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1732 /// # use linera_views::register_view::RegisterView;
1733 /// # use linera_views::views::View;
1734 /// # let context = MemoryContext::new_for_testing(());
1735 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1736 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1737 /// {
1738 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1739 /// }
1740 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1741 /// let value = subview.get();
1742 /// assert_eq!(*value, String::default());
1743 /// # })
1744 /// ```
1745 pub async fn try_load_entry<Q>(
1746 &self,
1747 index: &Q,
1748 ) -> Result<Option<ReadGuardedView<W>>, ViewError>
1749 where
1750 I: Borrow<Q>,
1751 Q: CustomSerialize,
1752 {
1753 let short_key = index.to_custom_bytes()?;
1754 self.collection.try_load_entry(&short_key).await
1755 }
1756
1757 /// Returns `true` if the collection contains a value for the specified key.
1758 /// ```rust
1759 /// # tokio_test::block_on(async {
1760 /// # use linera_views::context::MemoryContext;
1761 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1762 /// # use linera_views::register_view::RegisterView;
1763 /// # use linera_views::views::View;
1764 /// # let context = MemoryContext::new_for_testing(());
1765 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1766 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1767 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1768 /// assert!(view.contains_key(&23).await.unwrap());
1769 /// assert!(!view.contains_key(&24).await.unwrap());
1770 /// # })
1771 /// ```
1772 pub async fn contains_key<Q>(&self, index: &Q) -> Result<bool, ViewError>
1773 where
1774 I: Borrow<Q>,
1775 Q: CustomSerialize,
1776 {
1777 let short_key = index.to_custom_bytes()?;
1778 self.collection.contains_key(&short_key).await
1779 }
1780
1781 /// Removes an entry. If absent then nothing happens.
1782 /// ```rust
1783 /// # tokio_test::block_on(async {
1784 /// # use linera_views::context::MemoryContext;
1785 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1786 /// # use linera_views::register_view::RegisterView;
1787 /// # use linera_views::views::View;
1788 /// # let context = MemoryContext::new_for_testing(());
1789 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1790 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1791 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1792 /// let value = subview.get_mut();
1793 /// assert_eq!(*value, String::default());
1794 /// view.remove_entry(&23);
1795 /// let keys = view.indices().await.unwrap();
1796 /// assert_eq!(keys.len(), 0);
1797 /// # })
1798 /// ```
1799 pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1800 where
1801 I: Borrow<Q>,
1802 Q: CustomSerialize,
1803 {
1804 let short_key = index.to_custom_bytes()?;
1805 self.collection.remove_entry(short_key);
1806 Ok(())
1807 }
1808
1809 /// Marks the entry so that it is removed in the next flush.
1810 /// ```rust
1811 /// # tokio_test::block_on(async {
1812 /// # use linera_views::context::MemoryContext;
1813 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1814 /// # use linera_views::register_view::RegisterView;
1815 /// # use linera_views::views::View;
1816 /// # let context = MemoryContext::new_for_testing(());
1817 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1818 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1819 /// {
1820 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1821 /// let value = subview.get_mut();
1822 /// *value = String::from("Hello");
1823 /// }
1824 /// {
1825 /// view.try_reset_entry_to_default(&23).unwrap();
1826 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1827 /// let value = subview.get();
1828 /// assert_eq!(*value, String::default());
1829 /// }
1830 /// # })
1831 /// ```
1832 pub fn try_reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1833 where
1834 I: Borrow<Q>,
1835 Q: CustomSerialize,
1836 {
1837 let short_key = index.to_custom_bytes()?;
1838 self.collection.try_reset_entry_to_default(&short_key)
1839 }
1840
1841 /// Gets the extra data.
1842 pub fn extra(&self) -> &<W::Context as Context>::Extra {
1843 self.collection.extra()
1844 }
1845}
1846
1847impl<I, W: View> ReentrantCustomCollectionView<W::Context, I, W>
1848where
1849 I: Sync + Clone + Send + CustomSerialize,
1850{
1851 /// Load multiple entries for writing at once.
1852 /// The entries in indices have to be all distinct.
1853 /// ```rust
1854 /// # tokio_test::block_on(async {
1855 /// # use linera_views::context::MemoryContext;
1856 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1857 /// # use linera_views::register_view::RegisterView;
1858 /// # use linera_views::views::View;
1859 /// # let context = MemoryContext::new_for_testing(());
1860 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1861 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1862 /// let indices = vec![23, 42];
1863 /// let subviews = view.try_load_entries_mut(indices).await.unwrap();
1864 /// let value1 = subviews[0].get();
1865 /// let value2 = subviews[1].get();
1866 /// assert_eq!(*value1, String::default());
1867 /// assert_eq!(*value2, String::default());
1868 /// # })
1869 /// ```
1870 pub async fn try_load_entries_mut<Q>(
1871 &mut self,
1872 indices: impl IntoIterator<Item = Q>,
1873 ) -> Result<Vec<WriteGuardedView<W>>, ViewError>
1874 where
1875 I: Borrow<Q>,
1876 Q: CustomSerialize,
1877 {
1878 let short_keys = indices
1879 .into_iter()
1880 .map(|index| index.to_custom_bytes())
1881 .collect::<Result<_, _>>()?;
1882 self.collection.try_load_entries_mut(short_keys).await
1883 }
1884
1885 /// Load multiple entries for reading at once.
1886 /// The entries in indices have to be all distinct.
1887 /// ```rust
1888 /// # tokio_test::block_on(async {
1889 /// # use linera_views::context::MemoryContext;
1890 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1891 /// # use linera_views::register_view::RegisterView;
1892 /// # use linera_views::views::View;
1893 /// # let context = MemoryContext::new_for_testing(());
1894 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1895 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1896 /// {
1897 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1898 /// }
1899 /// let indices = vec![23, 42];
1900 /// let subviews = view.try_load_entries(indices).await.unwrap();
1901 /// assert!(subviews[1].is_none());
1902 /// let value0 = subviews[0].as_ref().unwrap().get();
1903 /// assert_eq!(*value0, String::default());
1904 /// # })
1905 /// ```
1906 pub async fn try_load_entries<Q>(
1907 &self,
1908 indices: impl IntoIterator<Item = Q>,
1909 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1910 where
1911 I: Borrow<Q>,
1912 Q: CustomSerialize,
1913 {
1914 let short_keys = indices
1915 .into_iter()
1916 .map(|index| index.to_custom_bytes())
1917 .collect::<Result<_, _>>()?;
1918 self.collection.try_load_entries(short_keys).await
1919 }
1920
1921 /// Loads all entries for writing at once.
1922 /// The entries in indices have to be all distinct.
1923 /// ```rust
1924 /// # tokio_test::block_on(async {
1925 /// # use linera_views::context::MemoryContext;
1926 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1927 /// # use linera_views::register_view::RegisterView;
1928 /// # use linera_views::views::View;
1929 /// # let context = MemoryContext::new_for_testing(());
1930 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1931 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1932 /// {
1933 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1934 /// }
1935 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
1936 /// assert_eq!(subviews.len(), 1);
1937 /// # })
1938 /// ```
1939 pub async fn try_load_all_entries_mut<Q>(
1940 &mut self,
1941 ) -> Result<Vec<(I, WriteGuardedView<W>)>, ViewError>
1942 where
1943 I: Borrow<Q>,
1944 Q: CustomSerialize,
1945 {
1946 let results = self.collection.try_load_all_entries_mut().await?;
1947 results
1948 .into_iter()
1949 .map(|(short_key, view)| {
1950 let index = I::from_custom_bytes(&short_key)?;
1951 Ok((index, view))
1952 })
1953 .collect()
1954 }
1955
1956 /// Load multiple entries for reading at once.
1957 /// The entries in indices have to be all distinct.
1958 /// ```rust
1959 /// # tokio_test::block_on(async {
1960 /// # use linera_views::context::MemoryContext;
1961 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1962 /// # use linera_views::register_view::RegisterView;
1963 /// # use linera_views::views::View;
1964 /// # let context = MemoryContext::new_for_testing(());
1965 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1966 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1967 /// {
1968 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1969 /// }
1970 /// let subviews = view.try_load_all_entries().await.unwrap();
1971 /// assert_eq!(subviews.len(), 1);
1972 /// # })
1973 /// ```
1974 pub async fn try_load_all_entries<Q>(&self) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError>
1975 where
1976 I: Borrow<Q>,
1977 Q: CustomSerialize,
1978 {
1979 let results = self.collection.try_load_all_entries().await?;
1980 results
1981 .into_iter()
1982 .map(|(short_key, view)| {
1983 let index = I::from_custom_bytes(&short_key)?;
1984 Ok((index, view))
1985 })
1986 .collect()
1987 }
1988}
1989
1990impl<I, W> ReentrantCustomCollectionView<W::Context, I, W>
1991where
1992 W: View,
1993 I: Sync + Clone + Send + CustomSerialize,
1994{
1995 /// Returns the list of indices in the collection. The order is determined by
1996 /// the custom serialization.
1997 /// ```rust
1998 /// # tokio_test::block_on(async {
1999 /// # use linera_views::context::MemoryContext;
2000 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2001 /// # use linera_views::register_view::RegisterView;
2002 /// # use linera_views::views::View;
2003 /// # let context = MemoryContext::new_for_testing(());
2004 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2005 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2006 /// view.try_load_entry_mut(&23).await.unwrap();
2007 /// view.try_load_entry_mut(&25).await.unwrap();
2008 /// let indices = view.indices().await.unwrap();
2009 /// assert_eq!(indices, vec![23, 25]);
2010 /// # })
2011 /// ```
2012 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
2013 let mut indices = Vec::new();
2014 self.for_each_index(|index| {
2015 indices.push(index);
2016 Ok(())
2017 })
2018 .await?;
2019 Ok(indices)
2020 }
2021
2022 /// Returns the number of entries in the collection.
2023 /// ```rust
2024 /// # tokio_test::block_on(async {
2025 /// # use linera_views::context::MemoryContext;
2026 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2027 /// # use linera_views::register_view::RegisterView;
2028 /// # use linera_views::views::View;
2029 /// # let context = MemoryContext::new_for_testing(());
2030 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2031 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2032 /// view.try_load_entry_mut(&23).await.unwrap();
2033 /// view.try_load_entry_mut(&25).await.unwrap();
2034 /// assert_eq!(view.count().await.unwrap(), 2);
2035 /// # })
2036 /// ```
2037 pub async fn count(&self) -> Result<usize, ViewError> {
2038 self.collection.count().await
2039 }
2040
2041 /// Applies a function f on each index. Indices are visited in an order
2042 /// determined by the custom serialization. If the function f returns false
2043 /// then the loop ends prematurely.
2044 /// ```rust
2045 /// # tokio_test::block_on(async {
2046 /// # use linera_views::context::MemoryContext;
2047 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2048 /// # use linera_views::register_view::RegisterView;
2049 /// # use linera_views::views::View;
2050 /// # let context = MemoryContext::new_for_testing(());
2051 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2052 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2053 /// view.try_load_entry_mut(&28).await.unwrap();
2054 /// view.try_load_entry_mut(&24).await.unwrap();
2055 /// view.try_load_entry_mut(&23).await.unwrap();
2056 /// let mut part_indices = Vec::new();
2057 /// view.for_each_index_while(|index| {
2058 /// part_indices.push(index);
2059 /// Ok(part_indices.len() < 2)
2060 /// })
2061 /// .await
2062 /// .unwrap();
2063 /// assert_eq!(part_indices, vec![23, 24]);
2064 /// # })
2065 /// ```
2066 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
2067 where
2068 F: FnMut(I) -> Result<bool, ViewError> + Send,
2069 {
2070 self.collection
2071 .for_each_key_while(|key| {
2072 let index = I::from_custom_bytes(key)?;
2073 f(index)
2074 })
2075 .await?;
2076 Ok(())
2077 }
2078
2079 /// Applies a function f on each index. Indices are visited in an order
2080 /// determined by the custom serialization.
2081 /// ```rust
2082 /// # tokio_test::block_on(async {
2083 /// # use linera_views::context::MemoryContext;
2084 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2085 /// # use linera_views::register_view::RegisterView;
2086 /// # use linera_views::views::View;
2087 /// # let context = MemoryContext::new_for_testing(());
2088 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2089 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2090 /// view.try_load_entry_mut(&28).await.unwrap();
2091 /// view.try_load_entry_mut(&24).await.unwrap();
2092 /// view.try_load_entry_mut(&23).await.unwrap();
2093 /// let mut indices = Vec::new();
2094 /// view.for_each_index(|index| {
2095 /// indices.push(index);
2096 /// Ok(())
2097 /// })
2098 /// .await
2099 /// .unwrap();
2100 /// assert_eq!(indices, vec![23, 24, 28]);
2101 /// # })
2102 /// ```
2103 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
2104 where
2105 F: FnMut(I) -> Result<(), ViewError> + Send,
2106 {
2107 self.collection
2108 .for_each_key(|key| {
2109 let index = I::from_custom_bytes(key)?;
2110 f(index)
2111 })
2112 .await?;
2113 Ok(())
2114 }
2115}
2116
2117impl<I, W> HashableView for ReentrantCustomCollectionView<W::Context, I, W>
2118where
2119 W: HashableView,
2120 I: Send + Sync + CustomSerialize,
2121{
2122 type Hasher = sha3::Sha3_256;
2123
2124 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
2125 self.collection.hash_mut().await
2126 }
2127
2128 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
2129 self.collection.hash().await
2130 }
2131}
2132
2133/// Type wrapping `ReentrantByteCollectionView` while memoizing the hash.
2134pub type HashedReentrantByteCollectionView<C, W> =
2135 WrappedHashableContainerView<C, ReentrantByteCollectionView<C, W>, HasherOutput>;
2136
2137/// Type wrapping `ReentrantCollectionView` while memoizing the hash.
2138pub type HashedReentrantCollectionView<C, I, W> =
2139 WrappedHashableContainerView<C, ReentrantCollectionView<C, I, W>, HasherOutput>;
2140
2141/// Type wrapping `ReentrantCustomCollectionView` while memoizing the hash.
2142pub type HashedReentrantCustomCollectionView<C, I, W> =
2143 WrappedHashableContainerView<C, ReentrantCustomCollectionView<C, I, W>, HasherOutput>;
2144
2145#[cfg(with_graphql)]
2146mod graphql {
2147 use std::borrow::Cow;
2148
2149 use super::{ReadGuardedView, ReentrantCollectionView};
2150 use crate::{
2151 graphql::{hash_name, mangle, missing_key_error, Entry, MapFilters, MapInput},
2152 views::View,
2153 };
2154
2155 impl<T: async_graphql::OutputType> async_graphql::OutputType for ReadGuardedView<T> {
2156 fn type_name() -> Cow<'static, str> {
2157 T::type_name()
2158 }
2159
2160 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
2161 T::create_type_info(registry)
2162 }
2163
2164 async fn resolve(
2165 &self,
2166 ctx: &async_graphql::ContextSelectionSet<'_>,
2167 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
2168 ) -> async_graphql::ServerResult<async_graphql::Value> {
2169 (**self).resolve(ctx, field).await
2170 }
2171 }
2172
2173 impl<C: Send + Sync, K: async_graphql::OutputType, V: async_graphql::OutputType>
2174 async_graphql::TypeName for ReentrantCollectionView<C, K, V>
2175 {
2176 fn type_name() -> Cow<'static, str> {
2177 format!(
2178 "ReentrantCollectionView_{}_{}_{:08x}",
2179 mangle(K::type_name()),
2180 mangle(V::type_name()),
2181 hash_name::<(K, V)>(),
2182 )
2183 .into()
2184 }
2185 }
2186
2187 #[async_graphql::Object(cache_control(no_cache), name_type)]
2188 impl<K, V> ReentrantCollectionView<V::Context, K, V>
2189 where
2190 K: async_graphql::InputType
2191 + async_graphql::OutputType
2192 + serde::ser::Serialize
2193 + serde::de::DeserializeOwned
2194 + std::fmt::Debug
2195 + Clone,
2196 V: View + async_graphql::OutputType,
2197 MapInput<K>: async_graphql::InputType,
2198 MapFilters<K>: async_graphql::InputType,
2199 {
2200 async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
2201 Ok(self.indices().await?)
2202 }
2203
2204 async fn entry(
2205 &self,
2206 key: K,
2207 ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
2208 let value = self
2209 .try_load_entry(&key)
2210 .await?
2211 .ok_or_else(|| missing_key_error(&key))?;
2212 Ok(Entry { value, key })
2213 }
2214
2215 async fn entries(
2216 &self,
2217 input: Option<MapInput<K>>,
2218 ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
2219 let keys = if let Some(keys) = input
2220 .and_then(|input| input.filters)
2221 .and_then(|filters| filters.keys)
2222 {
2223 keys
2224 } else {
2225 self.indices().await?
2226 };
2227
2228 let mut values = vec![];
2229 for key in keys {
2230 let value = self
2231 .try_load_entry(&key)
2232 .await?
2233 .ok_or_else(|| missing_key_error(&key))?;
2234 values.push(Entry { value, key })
2235 }
2236
2237 Ok(values)
2238 }
2239 }
2240
2241 use crate::reentrant_collection_view::ReentrantCustomCollectionView;
2242 impl<C: Send + Sync, K: async_graphql::OutputType, V: async_graphql::OutputType>
2243 async_graphql::TypeName for ReentrantCustomCollectionView<C, K, V>
2244 {
2245 fn type_name() -> Cow<'static, str> {
2246 format!(
2247 "ReentrantCustomCollectionView_{}_{}_{:08x}",
2248 mangle(K::type_name()),
2249 mangle(V::type_name()),
2250 hash_name::<(K, V)>(),
2251 )
2252 .into()
2253 }
2254 }
2255
2256 #[async_graphql::Object(cache_control(no_cache), name_type)]
2257 impl<K, V> ReentrantCustomCollectionView<V::Context, K, V>
2258 where
2259 K: async_graphql::InputType
2260 + async_graphql::OutputType
2261 + crate::common::CustomSerialize
2262 + std::fmt::Debug
2263 + Clone,
2264 V: View + async_graphql::OutputType,
2265 MapInput<K>: async_graphql::InputType,
2266 MapFilters<K>: async_graphql::InputType,
2267 {
2268 async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
2269 Ok(self.indices().await?)
2270 }
2271
2272 async fn entry(
2273 &self,
2274 key: K,
2275 ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
2276 let value = self
2277 .try_load_entry(&key)
2278 .await?
2279 .ok_or_else(|| missing_key_error(&key))?;
2280 Ok(Entry { value, key })
2281 }
2282
2283 async fn entries(
2284 &self,
2285 input: Option<MapInput<K>>,
2286 ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
2287 let keys = if let Some(keys) = input
2288 .and_then(|input| input.filters)
2289 .and_then(|filters| filters.keys)
2290 {
2291 keys
2292 } else {
2293 self.indices().await?
2294 };
2295
2296 let mut values = vec![];
2297 for key in keys {
2298 let value = self
2299 .try_load_entry(&key)
2300 .await?
2301 .ok_or_else(|| missing_key_error(&key))?;
2302 values.push(Entry { value, key })
2303 }
2304
2305 Ok(values)
2306 }
2307 }
2308}