linera_views/views/reentrant_collection_view.rs
1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5 borrow::Borrow,
6 collections::{btree_map, BTreeMap},
7 io::Write,
8 marker::PhantomData,
9 mem,
10 sync::{Arc, Mutex},
11};
12
13use async_lock::{RwLock, RwLockReadGuardArc, RwLockWriteGuardArc};
14#[cfg(with_metrics)]
15use linera_base::prometheus_util::MeasureLatency as _;
16use serde::{de::DeserializeOwned, Serialize};
17
18use crate::{
19 batch::Batch,
20 common::{CustomSerialize, HasherOutput, Update},
21 context::{BaseKey, Context},
22 hashable_wrapper::WrappedHashableContainerView,
23 store::{KeyIterable, ReadableKeyValueStore as _},
24 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
25};
26
27#[cfg(with_metrics)]
28mod metrics {
29 use std::sync::LazyLock;
30
31 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
32 use prometheus::HistogramVec;
33
34 /// The runtime of hash computation
35 pub static REENTRANT_COLLECTION_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> =
36 LazyLock::new(|| {
37 register_histogram_vec(
38 "reentrant_collection_view_hash_runtime",
39 "ReentrantCollectionView hash runtime",
40 &[],
41 exponential_bucket_latencies(5.0),
42 )
43 });
44}
45
46/// A read-only accessor for a particular subview in a [`ReentrantCollectionView`].
47#[derive(Debug)]
48pub struct ReadGuardedView<T>(RwLockReadGuardArc<T>);
49
50impl<T> std::ops::Deref for ReadGuardedView<T> {
51 type Target = T;
52 fn deref(&self) -> &T {
53 self.0.deref()
54 }
55}
56
57/// A read-write accessor for a particular subview in a [`ReentrantCollectionView`].
58#[derive(Debug)]
59pub struct WriteGuardedView<T>(RwLockWriteGuardArc<T>);
60
61impl<T> std::ops::Deref for WriteGuardedView<T> {
62 type Target = T;
63 fn deref(&self) -> &T {
64 self.0.deref()
65 }
66}
67
68impl<T> std::ops::DerefMut for WriteGuardedView<T> {
69 fn deref_mut(&mut self) -> &mut T {
70 self.0.deref_mut()
71 }
72}
73
74/// A view that supports accessing a collection of views of the same kind, indexed by `Vec<u8>`,
75/// possibly several subviews at a time.
76#[derive(Debug)]
77pub struct ReentrantByteCollectionView<C, W> {
78 /// The view [`Context`].
79 context: C,
80 /// If the current persisted data will be completely erased and replaced on the next flush.
81 delete_storage_first: bool,
82 /// Entries that may have staged changes.
83 updates: BTreeMap<Vec<u8>, Update<Arc<RwLock<W>>>>,
84 /// Entries cached in memory that have the exact same state as in the persistent storage.
85 cached_entries: Mutex<BTreeMap<Vec<u8>, Arc<RwLock<W>>>>,
86}
87
88/// We need to find new base keys in order to implement the collection view.
89/// We do this by appending a value to the base key.
90///
91/// Sub-views in a collection share a common key prefix, like in other view types. However,
92/// just concatenating the shared prefix with sub-view keys makes it impossible to distinguish if a
93/// given key belongs to a child sub-view or a grandchild sub-view (consider for example if a
94/// collection is stored inside the collection).
95#[repr(u8)]
96enum KeyTag {
97 /// Prefix for specifying an index and serves to indicate the existence of an entry in the collection.
98 Index = MIN_VIEW_TAG,
99 /// Prefix for specifying as the prefix for the sub-view.
100 Subview,
101}
102
103impl<W: View> View for ReentrantByteCollectionView<W::Context, W> {
104 const NUM_INIT_KEYS: usize = 0;
105
106 type Context = W::Context;
107
108 fn context(&self) -> &Self::Context {
109 &self.context
110 }
111
112 fn pre_load(_context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
113 Ok(Vec::new())
114 }
115
116 fn post_load(context: Self::Context, _values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
117 Ok(Self {
118 context,
119 delete_storage_first: false,
120 updates: BTreeMap::new(),
121 cached_entries: Mutex::new(BTreeMap::new()),
122 })
123 }
124
125 async fn load(context: Self::Context) -> Result<Self, ViewError> {
126 Self::post_load(context, &[])
127 }
128
129 fn rollback(&mut self) {
130 self.delete_storage_first = false;
131 self.updates.clear();
132 }
133
134 async fn has_pending_changes(&self) -> bool {
135 if self.delete_storage_first {
136 return true;
137 }
138 !self.updates.is_empty()
139 }
140
141 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
142 let mut delete_view = false;
143 if self.delete_storage_first {
144 delete_view = true;
145 batch.delete_key_prefix(self.context.base_key().bytes.clone());
146 for (index, update) in mem::take(&mut self.updates) {
147 if let Update::Set(view) = update {
148 let mut view = Arc::try_unwrap(view)
149 .map_err(|_| ViewError::TryLockError(index.clone()))?
150 .into_inner();
151 view.flush(batch)?;
152 self.add_index(batch, &index);
153 delete_view = false;
154 }
155 }
156 } else {
157 for (index, update) in mem::take(&mut self.updates) {
158 match update {
159 Update::Set(view) => {
160 let mut view = Arc::try_unwrap(view)
161 .map_err(|_| ViewError::TryLockError(index.clone()))?
162 .into_inner();
163 view.flush(batch)?;
164 self.add_index(batch, &index);
165 }
166 Update::Removed => {
167 let key_subview = self.get_subview_key(&index);
168 let key_index = self.get_index_key(&index);
169 batch.delete_key(key_index);
170 batch.delete_key_prefix(key_subview);
171 }
172 }
173 }
174 }
175 self.delete_storage_first = false;
176 Ok(delete_view)
177 }
178
179 fn clear(&mut self) {
180 self.delete_storage_first = true;
181 self.updates.clear();
182 self.cached_entries.get_mut().unwrap().clear();
183 }
184}
185
186impl<W: ClonableView> ClonableView for ReentrantByteCollectionView<W::Context, W> {
187 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
188 let cloned_updates = self
189 .updates
190 .iter()
191 .map(|(key, value)| {
192 let cloned_value = match value {
193 Update::Removed => Update::Removed,
194 Update::Set(view_lock) => {
195 let mut view = view_lock
196 .try_write()
197 .ok_or(ViewError::TryLockError(key.to_vec()))?;
198
199 Update::Set(Arc::new(RwLock::new(view.clone_unchecked()?)))
200 }
201 };
202 Ok((key.clone(), cloned_value))
203 })
204 .collect::<Result<_, ViewError>>()?;
205
206 Ok(ReentrantByteCollectionView {
207 context: self.context.clone(),
208 delete_storage_first: self.delete_storage_first,
209 updates: cloned_updates,
210 cached_entries: Mutex::new(BTreeMap::new()),
211 })
212 }
213}
214
215impl<C: Context, W> ReentrantByteCollectionView<C, W> {
216 fn get_index_key(&self, index: &[u8]) -> Vec<u8> {
217 self.context
218 .base_key()
219 .base_tag_index(KeyTag::Index as u8, index)
220 }
221
222 fn get_subview_key(&self, index: &[u8]) -> Vec<u8> {
223 self.context
224 .base_key()
225 .base_tag_index(KeyTag::Subview as u8, index)
226 }
227
228 fn add_index(&self, batch: &mut Batch, index: &[u8]) {
229 let key = self.get_index_key(index);
230 batch.put_key_value_bytes(key, vec![]);
231 }
232}
233
234impl<W: View> ReentrantByteCollectionView<W::Context, W> {
235 /// Reads the view and if missing returns the default view
236 async fn wrapped_view(
237 context: &W::Context,
238 delete_storage_first: bool,
239 short_key: &[u8],
240 ) -> Result<Arc<RwLock<W>>, ViewError> {
241 let key = context
242 .base_key()
243 .base_tag_index(KeyTag::Subview as u8, short_key);
244 let context = context.clone_with_base_key(key);
245 // Obtain a view and set its pending state to the default (e.g. empty) state
246 let view = if delete_storage_first {
247 W::new(context)?
248 } else {
249 W::load(context).await?
250 };
251 Ok(Arc::new(RwLock::new(view)))
252 }
253
254 /// Load the view and insert it into the updates if needed.
255 /// If the entry is missing, then it is set to default.
256 async fn try_load_view_mut(&mut self, short_key: &[u8]) -> Result<Arc<RwLock<W>>, ViewError> {
257 use btree_map::Entry::*;
258 Ok(match self.updates.entry(short_key.to_owned()) {
259 Occupied(mut entry) => match entry.get_mut() {
260 Update::Set(view) => view.clone(),
261 entry @ Update::Removed => {
262 let wrapped_view = Self::wrapped_view(&self.context, true, short_key).await?;
263 *entry = Update::Set(wrapped_view.clone());
264 wrapped_view
265 }
266 },
267 Vacant(entry) => {
268 let wrapped_view = match self.cached_entries.get_mut().unwrap().remove(short_key) {
269 Some(view) => view,
270 None => {
271 Self::wrapped_view(&self.context, self.delete_storage_first, short_key)
272 .await?
273 }
274 };
275 entry.insert(Update::Set(wrapped_view.clone()));
276 wrapped_view
277 }
278 })
279 }
280
281 /// Load the view from the update is available.
282 /// If missing, then the entry is loaded from storage and if
283 /// missing there an error is reported.
284 async fn try_load_view(&self, short_key: &[u8]) -> Result<Option<Arc<RwLock<W>>>, ViewError> {
285 Ok(if let Some(entry) = self.updates.get(short_key) {
286 match entry {
287 Update::Set(view) => Some(view.clone()),
288 _entry @ Update::Removed => None,
289 }
290 } else if self.delete_storage_first {
291 None
292 } else {
293 let view = {
294 let cached_entries = self.cached_entries.lock().unwrap();
295 let view = cached_entries.get(short_key);
296 view.cloned()
297 };
298 if let Some(view) = view {
299 Some(view.clone())
300 } else {
301 let key_index = self
302 .context
303 .base_key()
304 .base_tag_index(KeyTag::Index as u8, short_key);
305 if self.context.store().contains_key(&key_index).await? {
306 let view = Self::wrapped_view(&self.context, false, short_key).await?;
307 let mut cached_entries = self.cached_entries.lock().unwrap();
308 cached_entries.insert(short_key.to_owned(), view.clone());
309 Some(view)
310 } else {
311 None
312 }
313 }
314 })
315 }
316
317 /// Loads a subview for the data at the given index in the collection. If an entry
318 /// is absent then a default entry is added to the collection. The resulting view
319 /// can be modified.
320 /// ```rust
321 /// # tokio_test::block_on(async {
322 /// # use linera_views::context::MemoryContext;
323 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
324 /// # use linera_views::register_view::RegisterView;
325 /// # use linera_views::views::View;
326 /// # let context = MemoryContext::new_for_testing(());
327 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
328 /// ReentrantByteCollectionView::load(context).await.unwrap();
329 /// let subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
330 /// let value = subview.get();
331 /// assert_eq!(*value, String::default());
332 /// # })
333 /// ```
334 pub async fn try_load_entry_mut(
335 &mut self,
336 short_key: &[u8],
337 ) -> Result<WriteGuardedView<W>, ViewError> {
338 Ok(WriteGuardedView(
339 self.try_load_view_mut(short_key)
340 .await?
341 .try_write_arc()
342 .ok_or_else(|| ViewError::TryLockError(short_key.to_vec()))?,
343 ))
344 }
345
346 /// Loads a subview at the given index in the collection and gives read-only access to the data.
347 /// If an entry is absent then `None` is returned.
348 /// ```rust
349 /// # tokio_test::block_on(async {
350 /// # use linera_views::context::MemoryContext;
351 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
352 /// # use linera_views::register_view::RegisterView;
353 /// # use linera_views::views::View;
354 /// # let context = MemoryContext::new_for_testing(());
355 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
356 /// ReentrantByteCollectionView::load(context).await.unwrap();
357 /// {
358 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
359 /// }
360 /// let subview = view.try_load_entry(&[0, 1]).await.unwrap().unwrap();
361 /// let value = subview.get();
362 /// assert_eq!(*value, String::default());
363 /// # })
364 /// ```
365 pub async fn try_load_entry(
366 &self,
367 short_key: &[u8],
368 ) -> Result<Option<ReadGuardedView<W>>, ViewError> {
369 match self.try_load_view(short_key).await? {
370 None => Ok(None),
371 Some(view) => Ok(Some(ReadGuardedView(
372 view.try_read_arc()
373 .ok_or_else(|| ViewError::TryLockError(short_key.to_vec()))?,
374 ))),
375 }
376 }
377
378 /// Returns `true` if the collection contains a value for the specified key.
379 /// ```rust
380 /// # tokio_test::block_on(async {
381 /// # use linera_views::context::MemoryContext;
382 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
383 /// # use linera_views::register_view::RegisterView;
384 /// # use linera_views::views::View;
385 /// # let context = MemoryContext::new_for_testing(());
386 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
387 /// ReentrantByteCollectionView::load(context).await.unwrap();
388 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
389 /// assert!(view.contains_key(&[0, 1]).await.unwrap());
390 /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
391 /// # })
392 /// ```
393 pub async fn contains_key(&self, short_key: &[u8]) -> Result<bool, ViewError> {
394 Ok(if let Some(entry) = self.updates.get(short_key) {
395 match entry {
396 Update::Set(_view) => true,
397 Update::Removed => false,
398 }
399 } else if self.delete_storage_first {
400 false
401 } else if self.cached_entries.lock().unwrap().contains_key(short_key) {
402 true
403 } else {
404 let key_index = self
405 .context
406 .base_key()
407 .base_tag_index(KeyTag::Index as u8, short_key);
408 self.context.store().contains_key(&key_index).await?
409 })
410 }
411
412 /// Removes an entry. If absent then nothing happens.
413 /// ```rust
414 /// # tokio_test::block_on(async {
415 /// # use linera_views::context::MemoryContext;
416 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
417 /// # use linera_views::register_view::RegisterView;
418 /// # use linera_views::views::View;
419 /// # let context = MemoryContext::new_for_testing(());
420 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
421 /// ReentrantByteCollectionView::load(context).await.unwrap();
422 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
423 /// let value = subview.get_mut();
424 /// assert_eq!(*value, String::default());
425 /// view.remove_entry(vec![0, 1]);
426 /// let keys = view.keys().await.unwrap();
427 /// assert_eq!(keys.len(), 0);
428 /// # })
429 /// ```
430 pub fn remove_entry(&mut self, short_key: Vec<u8>) {
431 self.cached_entries.get_mut().unwrap().remove(&short_key);
432 if self.delete_storage_first {
433 // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
434 self.updates.remove(&short_key);
435 } else {
436 self.updates.insert(short_key, Update::Removed);
437 }
438 }
439
440 /// Marks the entry so that it is removed in the next flush.
441 /// ```rust
442 /// # tokio_test::block_on(async {
443 /// # use linera_views::context::MemoryContext;
444 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
445 /// # use linera_views::register_view::RegisterView;
446 /// # use linera_views::views::View;
447 /// # let context = MemoryContext::new_for_testing(());
448 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
449 /// ReentrantByteCollectionView::load(context).await.unwrap();
450 /// {
451 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
452 /// let value = subview.get_mut();
453 /// *value = String::from("Hello");
454 /// }
455 /// view.try_reset_entry_to_default(&[0, 1]).unwrap();
456 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
457 /// let value = subview.get_mut();
458 /// assert_eq!(*value, String::default());
459 /// # })
460 /// ```
461 pub fn try_reset_entry_to_default(&mut self, short_key: &[u8]) -> Result<(), ViewError> {
462 let key = self
463 .context
464 .base_key()
465 .base_tag_index(KeyTag::Subview as u8, short_key);
466 let context = self.context.clone_with_base_key(key);
467 let view = W::new(context)?;
468 let view = Arc::new(RwLock::new(view));
469 let view = Update::Set(view);
470 self.updates.insert(short_key.to_vec(), view);
471 self.cached_entries.get_mut().unwrap().remove(short_key);
472 Ok(())
473 }
474
475 /// Gets the extra data.
476 pub fn extra(&self) -> &<W::Context as Context>::Extra {
477 self.context.extra()
478 }
479}
480
481impl<W: View> ReentrantByteCollectionView<W::Context, W> {
482 /// Loads multiple entries for writing at once.
483 /// The entries in `short_keys` have to be all distinct.
484 /// ```rust
485 /// # tokio_test::block_on(async {
486 /// # use linera_views::context::MemoryContext;
487 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
488 /// # use linera_views::register_view::RegisterView;
489 /// # use linera_views::views::View;
490 /// # let context = MemoryContext::new_for_testing(());
491 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
492 /// ReentrantByteCollectionView::load(context).await.unwrap();
493 /// {
494 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
495 /// *subview.get_mut() = "Bonjour".to_string();
496 /// }
497 /// let short_keys = vec![vec![0, 1], vec![2, 3]];
498 /// let subviews = view.try_load_entries_mut(short_keys).await.unwrap();
499 /// let value1 = subviews[0].get();
500 /// let value2 = subviews[1].get();
501 /// assert_eq!(*value1, "Bonjour".to_string());
502 /// assert_eq!(*value2, String::default());
503 /// # })
504 /// ```
505 pub async fn try_load_entries_mut(
506 &mut self,
507 short_keys: Vec<Vec<u8>>,
508 ) -> Result<Vec<WriteGuardedView<W>>, ViewError> {
509 let cached_entries = self.cached_entries.get_mut().unwrap();
510 let mut short_keys_to_load = Vec::new();
511 let mut keys = Vec::new();
512 for short_key in &short_keys {
513 let key = self
514 .context
515 .base_key()
516 .base_tag_index(KeyTag::Subview as u8, short_key);
517 let context = self.context.clone_with_base_key(key);
518 match self.updates.entry(short_key.to_vec()) {
519 btree_map::Entry::Occupied(mut entry) => {
520 if let Update::Removed = entry.get() {
521 let view = W::new(context)?;
522 let view = Arc::new(RwLock::new(view));
523 entry.insert(Update::Set(view));
524 cached_entries.remove(short_key);
525 }
526 }
527 btree_map::Entry::Vacant(entry) => {
528 if self.delete_storage_first {
529 cached_entries.remove(short_key);
530 let view = W::new(context)?;
531 let view = Arc::new(RwLock::new(view));
532 entry.insert(Update::Set(view));
533 } else if let Some(view) = cached_entries.remove(short_key) {
534 entry.insert(Update::Set(view));
535 } else {
536 keys.extend(W::pre_load(&context)?);
537 short_keys_to_load.push(short_key.to_vec());
538 }
539 }
540 }
541 }
542 let values = self.context.store().read_multi_values_bytes(keys).await?;
543 for (loaded_values, short_key) in values
544 .chunks_exact(W::NUM_INIT_KEYS)
545 .zip(short_keys_to_load)
546 {
547 let key = self
548 .context
549 .base_key()
550 .base_tag_index(KeyTag::Subview as u8, &short_key);
551 let context = self.context.clone_with_base_key(key);
552 let view = W::post_load(context, loaded_values)?;
553 let wrapped_view = Arc::new(RwLock::new(view));
554 self.updates
555 .insert(short_key.to_vec(), Update::Set(wrapped_view));
556 }
557
558 short_keys
559 .into_iter()
560 .map(|short_key| {
561 let Some(Update::Set(view)) = self.updates.get(&short_key) else {
562 unreachable!()
563 };
564 Ok(WriteGuardedView(
565 view.clone()
566 .try_write_arc()
567 .ok_or_else(|| ViewError::TryLockError(short_key))?,
568 ))
569 })
570 .collect()
571 }
572
573 /// Loads multiple entries for reading at once.
574 /// The entries in `short_keys` have to be all distinct.
575 /// ```rust
576 /// # tokio_test::block_on(async {
577 /// # use linera_views::context::MemoryContext;
578 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
579 /// # use linera_views::register_view::RegisterView;
580 /// # use linera_views::views::View;
581 /// # let context = MemoryContext::new_for_testing(());
582 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
583 /// ReentrantByteCollectionView::load(context).await.unwrap();
584 /// {
585 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
586 /// }
587 /// let short_keys = vec![vec![0, 1], vec![2, 3]];
588 /// let subviews = view.try_load_entries(short_keys).await.unwrap();
589 /// assert!(subviews[1].is_none());
590 /// let value0 = subviews[0].as_ref().unwrap().get();
591 /// assert_eq!(*value0, String::default());
592 /// # })
593 /// ```
594 pub async fn try_load_entries(
595 &self,
596 short_keys: Vec<Vec<u8>>,
597 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError> {
598 let mut results = vec![None; short_keys.len()];
599 let mut keys_to_check = Vec::new();
600 let mut keys_to_check_metadata = Vec::new();
601
602 {
603 let cached_entries = self.cached_entries.lock().unwrap();
604 for (position, short_key) in short_keys.into_iter().enumerate() {
605 if let Some(update) = self.updates.get(&short_key) {
606 if let Update::Set(view) = update {
607 results[position] = Some((short_key, view.clone()));
608 }
609 } else if let Some(view) = cached_entries.get(&short_key) {
610 results[position] = Some((short_key, view.clone()));
611 } else if !self.delete_storage_first {
612 let key_index = self
613 .context
614 .base_key()
615 .base_tag_index(KeyTag::Index as u8, &short_key);
616 keys_to_check.push(key_index);
617 keys_to_check_metadata.push((position, short_key));
618 }
619 }
620 }
621
622 let found_keys = self.context.store().contains_keys(keys_to_check).await?;
623 let entries_to_load = keys_to_check_metadata
624 .into_iter()
625 .zip(found_keys)
626 .filter_map(|(metadata, found)| found.then_some(metadata))
627 .map(|(position, short_key)| {
628 let subview_key = self
629 .context
630 .base_key()
631 .base_tag_index(KeyTag::Subview as u8, &short_key);
632 let subview_context = self.context.clone_with_base_key(subview_key);
633 (position, short_key.to_owned(), subview_context)
634 })
635 .collect::<Vec<_>>();
636 if !entries_to_load.is_empty() {
637 let mut keys_to_load = Vec::with_capacity(entries_to_load.len() * W::NUM_INIT_KEYS);
638 for (_, _, context) in &entries_to_load {
639 keys_to_load.extend(W::pre_load(context)?);
640 }
641 let values = self
642 .context
643 .store()
644 .read_multi_values_bytes(keys_to_load)
645 .await?;
646 let mut cached_entries = self.cached_entries.lock().unwrap();
647 for (loaded_values, (position, short_key, context)) in
648 values.chunks_exact(W::NUM_INIT_KEYS).zip(entries_to_load)
649 {
650 let view = W::post_load(context, loaded_values)?;
651 let wrapped_view = Arc::new(RwLock::new(view));
652 cached_entries.insert(short_key.clone(), wrapped_view.clone());
653 results[position] = Some((short_key, wrapped_view));
654 }
655 }
656
657 results
658 .into_iter()
659 .map(|maybe_view| match maybe_view {
660 Some((short_key, view)) => Ok(Some(ReadGuardedView(
661 view.try_read_arc()
662 .ok_or_else(|| ViewError::TryLockError(short_key))?,
663 ))),
664 None => Ok(None),
665 })
666 .collect()
667 }
668
669 /// Loads all the entries for reading at once.
670 /// ```rust
671 /// # tokio_test::block_on(async {
672 /// # use linera_views::context::MemoryContext;
673 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
674 /// # use linera_views::register_view::RegisterView;
675 /// # use linera_views::views::View;
676 /// # let context = MemoryContext::new_for_testing(());
677 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
678 /// ReentrantByteCollectionView::load(context).await.unwrap();
679 /// {
680 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
681 /// }
682 /// let subviews = view.try_load_all_entries().await.unwrap();
683 /// assert_eq!(subviews.len(), 1);
684 /// # })
685 /// ```
686 pub async fn try_load_all_entries(
687 &self,
688 ) -> Result<Vec<(Vec<u8>, ReadGuardedView<W>)>, ViewError> {
689 let short_keys = self.keys().await?;
690 if !self.delete_storage_first {
691 let mut keys = Vec::new();
692 let mut short_keys_to_load = Vec::new();
693 {
694 let cached_entries = self.cached_entries.lock().unwrap();
695 for short_key in &short_keys {
696 if !self.updates.contains_key(short_key)
697 && !cached_entries.contains_key(short_key)
698 {
699 let key = self
700 .context
701 .base_key()
702 .base_tag_index(KeyTag::Subview as u8, short_key);
703 let context = self.context.clone_with_base_key(key);
704 keys.extend(W::pre_load(&context)?);
705 short_keys_to_load.push(short_key.to_vec());
706 }
707 }
708 }
709 let values = self.context.store().read_multi_values_bytes(keys).await?;
710 {
711 let mut cached_entries = self.cached_entries.lock().unwrap();
712 for (loaded_values, short_key) in values
713 .chunks_exact(W::NUM_INIT_KEYS)
714 .zip(short_keys_to_load)
715 {
716 let key = self
717 .context
718 .base_key()
719 .base_tag_index(KeyTag::Subview as u8, &short_key);
720 let context = self.context.clone_with_base_key(key);
721 let view = W::post_load(context, loaded_values)?;
722 let wrapped_view = Arc::new(RwLock::new(view));
723 cached_entries.insert(short_key.to_vec(), wrapped_view);
724 }
725 }
726 }
727 let cached_entries = self.cached_entries.lock().unwrap();
728 short_keys
729 .into_iter()
730 .map(|short_key| {
731 let view = if let Some(Update::Set(view)) = self.updates.get(&short_key) {
732 view.clone()
733 } else if let Some(view) = cached_entries.get(&short_key) {
734 view.clone()
735 } else {
736 unreachable!("All entries should have been loaded into memory");
737 };
738 let guard = ReadGuardedView(
739 view.try_read_arc()
740 .ok_or_else(|| ViewError::TryLockError(short_key.clone()))?,
741 );
742 Ok((short_key, guard))
743 })
744 .collect()
745 }
746
747 /// Loads all the entries for writing at once.
748 /// ```rust
749 /// # tokio_test::block_on(async {
750 /// # use linera_views::context::MemoryContext;
751 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
752 /// # use linera_views::register_view::RegisterView;
753 /// # use linera_views::views::View;
754 /// # let context = MemoryContext::new_for_testing(());
755 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
756 /// ReentrantByteCollectionView::load(context).await.unwrap();
757 /// {
758 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
759 /// }
760 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
761 /// assert_eq!(subviews.len(), 1);
762 /// # })
763 /// ```
764 pub async fn try_load_all_entries_mut(
765 &mut self,
766 ) -> Result<Vec<(Vec<u8>, WriteGuardedView<W>)>, ViewError> {
767 let short_keys = self.keys().await?;
768 if !self.delete_storage_first {
769 let mut keys = Vec::new();
770 let mut short_keys_to_load = Vec::new();
771 {
772 let cached_entries = self.cached_entries.get_mut().unwrap();
773 for short_key in &short_keys {
774 if !self.updates.contains_key(short_key) {
775 if let Some(view) = cached_entries.remove(short_key) {
776 self.updates.insert(short_key.to_vec(), Update::Set(view));
777 } else {
778 let key = self
779 .context
780 .base_key()
781 .base_tag_index(KeyTag::Subview as u8, short_key);
782 let context = self.context.clone_with_base_key(key);
783 keys.extend(W::pre_load(&context)?);
784 short_keys_to_load.push(short_key.to_vec());
785 }
786 }
787 }
788 }
789 let values = self.context.store().read_multi_values_bytes(keys).await?;
790 for (loaded_values, short_key) in values
791 .chunks_exact(W::NUM_INIT_KEYS)
792 .zip(short_keys_to_load)
793 {
794 let key = self
795 .context
796 .base_key()
797 .base_tag_index(KeyTag::Subview as u8, &short_key);
798 let context = self.context.clone_with_base_key(key);
799 let view = W::post_load(context, loaded_values)?;
800 let wrapped_view = Arc::new(RwLock::new(view));
801 self.updates
802 .insert(short_key.to_vec(), Update::Set(wrapped_view));
803 }
804 }
805 short_keys
806 .into_iter()
807 .map(|short_key| {
808 let Some(Update::Set(view)) = self.updates.get(&short_key) else {
809 unreachable!("All entries should have been loaded into `updates`")
810 };
811 let guard = WriteGuardedView(
812 view.clone()
813 .try_write_arc()
814 .ok_or_else(|| ViewError::TryLockError(short_key.clone()))?,
815 );
816 Ok((short_key, guard))
817 })
818 .collect()
819 }
820}
821
822impl<W: View> ReentrantByteCollectionView<W::Context, W> {
823 /// Returns the list of indices in the collection in lexicographic order.
824 /// ```rust
825 /// # tokio_test::block_on(async {
826 /// # use linera_views::context::MemoryContext;
827 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
828 /// # use linera_views::register_view::RegisterView;
829 /// # use linera_views::views::View;
830 /// # let context = MemoryContext::new_for_testing(());
831 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
832 /// ReentrantByteCollectionView::load(context).await.unwrap();
833 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
834 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
835 /// let keys = view.keys().await.unwrap();
836 /// assert_eq!(keys, vec![vec![0, 1], vec![0, 2]]);
837 /// # })
838 /// ```
839 pub async fn keys(&self) -> Result<Vec<Vec<u8>>, ViewError> {
840 let mut keys = Vec::new();
841 self.for_each_key(|key| {
842 keys.push(key.to_vec());
843 Ok(())
844 })
845 .await?;
846 Ok(keys)
847 }
848
849 /// Returns the number of indices of the collection.
850 /// ```rust
851 /// # tokio_test::block_on(async {
852 /// # use linera_views::context::MemoryContext;
853 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
854 /// # use linera_views::register_view::RegisterView;
855 /// # use linera_views::views::View;
856 /// # let context = MemoryContext::new_for_testing(());
857 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
858 /// ReentrantByteCollectionView::load(context).await.unwrap();
859 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
860 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
861 /// assert_eq!(view.count().await.unwrap(), 2);
862 /// # })
863 /// ```
864 pub async fn count(&self) -> Result<usize, ViewError> {
865 let mut count = 0;
866 self.for_each_key(|_key| {
867 count += 1;
868 Ok(())
869 })
870 .await?;
871 Ok(count)
872 }
873
874 /// Applies a function f on each index (aka key). Keys are visited in a
875 /// lexicographic order. If the function returns false then the loop
876 /// ends prematurely.
877 /// ```rust
878 /// # tokio_test::block_on(async {
879 /// # use linera_views::context::MemoryContext;
880 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
881 /// # use linera_views::register_view::RegisterView;
882 /// # use linera_views::views::View;
883 /// # let context = MemoryContext::new_for_testing(());
884 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
885 /// ReentrantByteCollectionView::load(context).await.unwrap();
886 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
887 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
888 /// let mut count = 0;
889 /// view.for_each_key_while(|_key| {
890 /// count += 1;
891 /// Ok(count < 1)
892 /// })
893 /// .await
894 /// .unwrap();
895 /// assert_eq!(count, 1);
896 /// # })
897 /// ```
898 pub async fn for_each_key_while<F>(&self, mut f: F) -> Result<(), ViewError>
899 where
900 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
901 {
902 let mut updates = self.updates.iter();
903 let mut update = updates.next();
904 if !self.delete_storage_first {
905 let base = self.get_index_key(&[]);
906 for index in self
907 .context
908 .store()
909 .find_keys_by_prefix(&base)
910 .await?
911 .iterator()
912 {
913 let index = index?;
914 loop {
915 match update {
916 Some((key, value)) if key.as_slice() <= index => {
917 if let Update::Set(_) = value {
918 if !f(key)? {
919 return Ok(());
920 }
921 }
922 update = updates.next();
923 if key == index {
924 break;
925 }
926 }
927 _ => {
928 if !f(index)? {
929 return Ok(());
930 }
931 break;
932 }
933 }
934 }
935 }
936 }
937 while let Some((key, value)) = update {
938 if let Update::Set(_) = value {
939 if !f(key)? {
940 return Ok(());
941 }
942 }
943 update = updates.next();
944 }
945 Ok(())
946 }
947
948 /// Applies a function f on each index (aka key). Keys are visited in a
949 /// lexicographic order.
950 /// ```rust
951 /// # tokio_test::block_on(async {
952 /// # use linera_views::context::MemoryContext;
953 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
954 /// # use linera_views::register_view::RegisterView;
955 /// # use linera_views::views::View;
956 /// # let context = MemoryContext::new_for_testing(());
957 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
958 /// ReentrantByteCollectionView::load(context).await.unwrap();
959 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
960 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
961 /// let mut count = 0;
962 /// view.for_each_key(|_key| {
963 /// count += 1;
964 /// Ok(())
965 /// })
966 /// .await
967 /// .unwrap();
968 /// assert_eq!(count, 2);
969 /// # })
970 /// ```
971 pub async fn for_each_key<F>(&self, mut f: F) -> Result<(), ViewError>
972 where
973 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
974 {
975 self.for_each_key_while(|key| {
976 f(key)?;
977 Ok(true)
978 })
979 .await
980 }
981}
982
983impl<W: HashableView> HashableView for ReentrantByteCollectionView<W::Context, W> {
984 type Hasher = sha3::Sha3_256;
985
986 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
987 #[cfg(with_metrics)]
988 let _hash_latency = metrics::REENTRANT_COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
989 let mut hasher = sha3::Sha3_256::default();
990 let keys = self.keys().await?;
991 let count = keys.len() as u32;
992 hasher.update_with_bcs_bytes(&count)?;
993 let cached_entries = self.cached_entries.get_mut().unwrap();
994 for key in keys {
995 hasher.update_with_bytes(&key)?;
996 let hash = if let Some(entry) = self.updates.get_mut(&key) {
997 let Update::Set(view) = entry else {
998 unreachable!();
999 };
1000 let mut view = view
1001 .try_write_arc()
1002 .ok_or_else(|| ViewError::TryLockError(key))?;
1003 view.hash_mut().await?
1004 } else if let Some(view) = cached_entries.get_mut(&key) {
1005 let mut view = view
1006 .try_write_arc()
1007 .ok_or_else(|| ViewError::TryLockError(key))?;
1008 view.hash_mut().await?
1009 } else {
1010 let key = self
1011 .context
1012 .base_key()
1013 .base_tag_index(KeyTag::Subview as u8, &key);
1014 let context = self.context.clone_with_base_key(key);
1015 let mut view = W::load(context).await?;
1016 view.hash_mut().await?
1017 };
1018 hasher.write_all(hash.as_ref())?;
1019 }
1020 Ok(hasher.finalize())
1021 }
1022
1023 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1024 #[cfg(with_metrics)]
1025 let _hash_latency = metrics::REENTRANT_COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
1026 let mut hasher = sha3::Sha3_256::default();
1027 let keys = self.keys().await?;
1028 let count = keys.len() as u32;
1029 hasher.update_with_bcs_bytes(&count)?;
1030 let mut cached_entries_result = Vec::new();
1031 {
1032 let cached_entries = self.cached_entries.lock().unwrap();
1033 for key in &keys {
1034 cached_entries_result.push(cached_entries.get(key).cloned());
1035 }
1036 }
1037 for (key, cached_entry) in keys.into_iter().zip(cached_entries_result) {
1038 hasher.update_with_bytes(&key)?;
1039 let hash = if let Some(entry) = self.updates.get(&key) {
1040 let Update::Set(view) = entry else {
1041 unreachable!();
1042 };
1043 let view = view
1044 .try_read_arc()
1045 .ok_or_else(|| ViewError::TryLockError(key))?;
1046 view.hash().await?
1047 } else if let Some(view) = cached_entry {
1048 let view = view
1049 .try_read_arc()
1050 .ok_or_else(|| ViewError::TryLockError(key))?;
1051 view.hash().await?
1052 } else {
1053 let key = self
1054 .context
1055 .base_key()
1056 .base_tag_index(KeyTag::Subview as u8, &key);
1057 let context = self.context.clone_with_base_key(key);
1058 let view = W::load(context).await?;
1059 view.hash().await?
1060 };
1061 hasher.write_all(hash.as_ref())?;
1062 }
1063 Ok(hasher.finalize())
1064 }
1065}
1066
1067/// A view that supports accessing a collection of views of the same kind, indexed by keys,
1068/// possibly several subviews at a time.
1069#[derive(Debug)]
1070pub struct ReentrantCollectionView<C, I, W> {
1071 collection: ReentrantByteCollectionView<C, W>,
1072 _phantom: PhantomData<I>,
1073}
1074
1075impl<I, W> View for ReentrantCollectionView<W::Context, I, W>
1076where
1077 W: View,
1078 I: Send + Sync + Serialize + DeserializeOwned,
1079{
1080 const NUM_INIT_KEYS: usize = ReentrantByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
1081
1082 type Context = W::Context;
1083
1084 fn context(&self) -> &Self::Context {
1085 self.collection.context()
1086 }
1087
1088 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
1089 ReentrantByteCollectionView::<W::Context, W>::pre_load(context)
1090 }
1091
1092 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
1093 let collection = ReentrantByteCollectionView::post_load(context, values)?;
1094 Ok(ReentrantCollectionView {
1095 collection,
1096 _phantom: PhantomData,
1097 })
1098 }
1099
1100 async fn load(context: Self::Context) -> Result<Self, ViewError> {
1101 Self::post_load(context, &[])
1102 }
1103
1104 fn rollback(&mut self) {
1105 self.collection.rollback()
1106 }
1107
1108 async fn has_pending_changes(&self) -> bool {
1109 self.collection.has_pending_changes().await
1110 }
1111
1112 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
1113 self.collection.flush(batch)
1114 }
1115
1116 fn clear(&mut self) {
1117 self.collection.clear()
1118 }
1119}
1120
1121impl<I, W> ClonableView for ReentrantCollectionView<W::Context, I, W>
1122where
1123 W: ClonableView,
1124 I: Send + Sync + Serialize + DeserializeOwned,
1125{
1126 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
1127 Ok(ReentrantCollectionView {
1128 collection: self.collection.clone_unchecked()?,
1129 _phantom: PhantomData,
1130 })
1131 }
1132}
1133
1134impl<I, W> ReentrantCollectionView<W::Context, I, W>
1135where
1136 W: View,
1137 I: Sync + Clone + Send + Serialize + DeserializeOwned,
1138{
1139 /// Loads a subview for the data at the given index in the collection. If an entry
1140 /// is absent then a default entry is put on the collection. The obtained view can
1141 /// then be modified.
1142 /// ```rust
1143 /// # tokio_test::block_on(async {
1144 /// # use linera_views::context::MemoryContext;
1145 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1146 /// # use linera_views::register_view::RegisterView;
1147 /// # use linera_views::views::View;
1148 /// # let context = MemoryContext::new_for_testing(());
1149 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1150 /// ReentrantCollectionView::load(context).await.unwrap();
1151 /// let subview = view.try_load_entry_mut(&23).await.unwrap();
1152 /// let value = subview.get();
1153 /// assert_eq!(*value, String::default());
1154 /// # })
1155 /// ```
1156 pub async fn try_load_entry_mut<Q>(
1157 &mut self,
1158 index: &Q,
1159 ) -> Result<WriteGuardedView<W>, ViewError>
1160 where
1161 I: Borrow<Q>,
1162 Q: Serialize + ?Sized,
1163 {
1164 let short_key = BaseKey::derive_short_key(index)?;
1165 self.collection.try_load_entry_mut(&short_key).await
1166 }
1167
1168 /// Loads a subview at the given index in the collection and gives read-only access to the data.
1169 /// If an entry is absent then `None` is returned.
1170 /// ```rust
1171 /// # tokio_test::block_on(async {
1172 /// # use linera_views::context::MemoryContext;
1173 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1174 /// # use linera_views::register_view::RegisterView;
1175 /// # use linera_views::views::View;
1176 /// # let context = MemoryContext::new_for_testing(());
1177 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1178 /// ReentrantCollectionView::load(context).await.unwrap();
1179 /// {
1180 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1181 /// }
1182 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1183 /// let value = subview.get();
1184 /// assert_eq!(*value, String::default());
1185 /// # })
1186 /// ```
1187 pub async fn try_load_entry<Q>(
1188 &self,
1189 index: &Q,
1190 ) -> Result<Option<ReadGuardedView<W>>, ViewError>
1191 where
1192 I: Borrow<Q>,
1193 Q: Serialize + ?Sized,
1194 {
1195 let short_key = BaseKey::derive_short_key(index)?;
1196 self.collection.try_load_entry(&short_key).await
1197 }
1198
1199 /// Returns `true` if the collection contains a value for the specified key.
1200 /// ```rust
1201 /// # tokio_test::block_on(async {
1202 /// # use linera_views::context::MemoryContext;
1203 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1204 /// # use linera_views::register_view::RegisterView;
1205 /// # use linera_views::views::View;
1206 /// # let context = MemoryContext::new_for_testing(());
1207 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1208 /// ReentrantCollectionView::load(context).await.unwrap();
1209 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1210 /// assert!(view.contains_key(&23).await.unwrap());
1211 /// assert!(!view.contains_key(&24).await.unwrap());
1212 /// # })
1213 /// ```
1214 pub async fn contains_key<Q>(&self, index: &Q) -> Result<bool, ViewError>
1215 where
1216 I: Borrow<Q>,
1217 Q: Serialize + ?Sized,
1218 {
1219 let short_key = BaseKey::derive_short_key(index)?;
1220 self.collection.contains_key(&short_key).await
1221 }
1222
1223 /// Marks the entry so that it is removed in the next flush.
1224 /// ```rust
1225 /// # tokio_test::block_on(async {
1226 /// # use linera_views::context::MemoryContext;
1227 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1228 /// # use linera_views::register_view::RegisterView;
1229 /// # use linera_views::views::View;
1230 /// # let context = MemoryContext::new_for_testing(());
1231 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1232 /// ReentrantCollectionView::load(context).await.unwrap();
1233 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1234 /// let value = subview.get_mut();
1235 /// assert_eq!(*value, String::default());
1236 /// view.remove_entry(&23);
1237 /// let keys = view.indices().await.unwrap();
1238 /// assert_eq!(keys.len(), 0);
1239 /// # })
1240 /// ```
1241 pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1242 where
1243 I: Borrow<Q>,
1244 Q: Serialize + ?Sized,
1245 {
1246 let short_key = BaseKey::derive_short_key(index)?;
1247 self.collection.remove_entry(short_key);
1248 Ok(())
1249 }
1250
1251 /// Marks the entry so that it is removed in the next flush.
1252 /// ```rust
1253 /// # tokio_test::block_on(async {
1254 /// # use linera_views::context::MemoryContext;
1255 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1256 /// # use linera_views::register_view::RegisterView;
1257 /// # use linera_views::views::View;
1258 /// # let context = MemoryContext::new_for_testing(());
1259 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1260 /// ReentrantCollectionView::load(context).await.unwrap();
1261 /// {
1262 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1263 /// let value = subview.get_mut();
1264 /// *value = String::from("Hello");
1265 /// }
1266 /// view.try_reset_entry_to_default(&23).unwrap();
1267 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1268 /// let value = subview.get_mut();
1269 /// assert_eq!(*value, String::default());
1270 /// # })
1271 /// ```
1272 pub fn try_reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1273 where
1274 I: Borrow<Q>,
1275 Q: Serialize + ?Sized,
1276 {
1277 let short_key = BaseKey::derive_short_key(index)?;
1278 self.collection.try_reset_entry_to_default(&short_key)
1279 }
1280
1281 /// Gets the extra data.
1282 pub fn extra(&self) -> &<W::Context as Context>::Extra {
1283 self.collection.extra()
1284 }
1285}
1286
1287impl<I, W> ReentrantCollectionView<W::Context, I, W>
1288where
1289 W: View,
1290 I: Sync + Clone + Send + Serialize + DeserializeOwned,
1291{
1292 /// Load multiple entries for writing at once.
1293 /// The entries in indices have to be all distinct.
1294 /// ```rust
1295 /// # tokio_test::block_on(async {
1296 /// # use linera_views::context::MemoryContext;
1297 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1298 /// # use linera_views::register_view::RegisterView;
1299 /// # use linera_views::views::View;
1300 /// # let context = MemoryContext::new_for_testing(());
1301 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1302 /// ReentrantCollectionView::load(context).await.unwrap();
1303 /// let indices = vec![23, 42];
1304 /// let subviews = view.try_load_entries_mut(&indices).await.unwrap();
1305 /// let value1 = subviews[0].get();
1306 /// let value2 = subviews[1].get();
1307 /// assert_eq!(*value1, String::default());
1308 /// assert_eq!(*value2, String::default());
1309 /// # })
1310 /// ```
1311 pub async fn try_load_entries_mut<'a, Q>(
1312 &'a mut self,
1313 indices: impl IntoIterator<Item = &'a Q>,
1314 ) -> Result<Vec<WriteGuardedView<W>>, ViewError>
1315 where
1316 I: Borrow<Q>,
1317 Q: Serialize + 'a,
1318 {
1319 let short_keys = indices
1320 .into_iter()
1321 .map(|index| BaseKey::derive_short_key(index))
1322 .collect::<Result<_, _>>()?;
1323 self.collection.try_load_entries_mut(short_keys).await
1324 }
1325
1326 /// Load multiple entries for reading at once.
1327 /// The entries in indices have to be all distinct.
1328 /// ```rust
1329 /// # tokio_test::block_on(async {
1330 /// # use linera_views::context::MemoryContext;
1331 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1332 /// # use linera_views::register_view::RegisterView;
1333 /// # use linera_views::views::View;
1334 /// # let context = MemoryContext::new_for_testing(());
1335 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1336 /// ReentrantCollectionView::load(context).await.unwrap();
1337 /// {
1338 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1339 /// }
1340 /// let indices = vec![23, 42];
1341 /// let subviews = view.try_load_entries(&indices).await.unwrap();
1342 /// assert!(subviews[1].is_none());
1343 /// let value0 = subviews[0].as_ref().unwrap().get();
1344 /// assert_eq!(*value0, String::default());
1345 /// # })
1346 /// ```
1347 pub async fn try_load_entries<'a, Q>(
1348 &'a self,
1349 indices: impl IntoIterator<Item = &'a Q>,
1350 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1351 where
1352 I: Borrow<Q>,
1353 Q: Serialize + 'a,
1354 {
1355 let short_keys = indices
1356 .into_iter()
1357 .map(|index| BaseKey::derive_short_key(index))
1358 .collect::<Result<_, _>>()?;
1359 self.collection.try_load_entries(short_keys).await
1360 }
1361
1362 /// Loads all entries for writing at once.
1363 /// The entries in indices have to be all distinct.
1364 /// ```rust
1365 /// # tokio_test::block_on(async {
1366 /// # use linera_views::context::MemoryContext;
1367 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1368 /// # use linera_views::register_view::RegisterView;
1369 /// # use linera_views::views::View;
1370 /// # let context = MemoryContext::new_for_testing(());
1371 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1372 /// ReentrantCollectionView::load(context).await.unwrap();
1373 /// {
1374 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1375 /// }
1376 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
1377 /// assert_eq!(subviews.len(), 1);
1378 /// # })
1379 /// ```
1380 pub async fn try_load_all_entries_mut<'a, Q>(
1381 &'a mut self,
1382 ) -> Result<Vec<(I, WriteGuardedView<W>)>, ViewError>
1383 where
1384 I: Borrow<Q>,
1385 Q: Serialize + 'a,
1386 {
1387 let results = self.collection.try_load_all_entries_mut().await?;
1388 results
1389 .into_iter()
1390 .map(|(short_key, view)| {
1391 let index = BaseKey::deserialize_value(&short_key)?;
1392 Ok((index, view))
1393 })
1394 .collect()
1395 }
1396
1397 /// Load multiple entries for reading at once.
1398 /// The entries in indices have to be all distinct.
1399 /// ```rust
1400 /// # tokio_test::block_on(async {
1401 /// # use linera_views::context::MemoryContext;
1402 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1403 /// # use linera_views::register_view::RegisterView;
1404 /// # use linera_views::views::View;
1405 /// # let context = MemoryContext::new_for_testing(());
1406 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1407 /// ReentrantCollectionView::load(context).await.unwrap();
1408 /// {
1409 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1410 /// }
1411 /// let subviews = view.try_load_all_entries().await.unwrap();
1412 /// assert_eq!(subviews.len(), 1);
1413 /// # })
1414 /// ```
1415 pub async fn try_load_all_entries<'a, Q>(
1416 &'a self,
1417 ) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError>
1418 where
1419 I: Borrow<Q>,
1420 Q: Serialize + 'a,
1421 {
1422 let results = self.collection.try_load_all_entries().await?;
1423 results
1424 .into_iter()
1425 .map(|(short_key, view)| {
1426 let index = BaseKey::deserialize_value(&short_key)?;
1427 Ok((index, view))
1428 })
1429 .collect()
1430 }
1431}
1432
1433impl<I, W> ReentrantCollectionView<W::Context, I, W>
1434where
1435 W: View,
1436 I: Sync + Clone + Send + Serialize + DeserializeOwned,
1437{
1438 /// Returns the list of indices in the collection in an order determined
1439 /// by serialization.
1440 /// ```rust
1441 /// # tokio_test::block_on(async {
1442 /// # use linera_views::context::MemoryContext;
1443 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1444 /// # use linera_views::register_view::RegisterView;
1445 /// # use linera_views::views::View;
1446 /// # let context = MemoryContext::new_for_testing(());
1447 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1448 /// ReentrantCollectionView::load(context).await.unwrap();
1449 /// view.try_load_entry_mut(&23).await.unwrap();
1450 /// view.try_load_entry_mut(&25).await.unwrap();
1451 /// let indices = view.indices().await.unwrap();
1452 /// assert_eq!(indices.len(), 2);
1453 /// # })
1454 /// ```
1455 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
1456 let mut indices = Vec::new();
1457 self.for_each_index(|index| {
1458 indices.push(index);
1459 Ok(())
1460 })
1461 .await?;
1462 Ok(indices)
1463 }
1464
1465 /// Returns the number of indices in the collection.
1466 /// ```rust
1467 /// # tokio_test::block_on(async {
1468 /// # use linera_views::context::MemoryContext;
1469 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1470 /// # use linera_views::register_view::RegisterView;
1471 /// # use linera_views::views::View;
1472 /// # let context = MemoryContext::new_for_testing(());
1473 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1474 /// ReentrantCollectionView::load(context).await.unwrap();
1475 /// view.try_load_entry_mut(&23).await.unwrap();
1476 /// view.try_load_entry_mut(&25).await.unwrap();
1477 /// assert_eq!(view.count().await.unwrap(), 2);
1478 /// # })
1479 /// ```
1480 pub async fn count(&self) -> Result<usize, ViewError> {
1481 self.collection.count().await
1482 }
1483
1484 /// Applies a function f on each index. Indices are visited in an order
1485 /// determined by the serialization. If the function f returns false then
1486 /// the loop ends prematurely.
1487 /// ```rust
1488 /// # tokio_test::block_on(async {
1489 /// # use linera_views::context::MemoryContext;
1490 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1491 /// # use linera_views::register_view::RegisterView;
1492 /// # use linera_views::views::View;
1493 /// # let context = MemoryContext::new_for_testing(());
1494 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1495 /// ReentrantCollectionView::load(context).await.unwrap();
1496 /// view.try_load_entry_mut(&23).await.unwrap();
1497 /// view.try_load_entry_mut(&24).await.unwrap();
1498 /// let mut count = 0;
1499 /// view.for_each_index_while(|_key| {
1500 /// count += 1;
1501 /// Ok(count < 1)
1502 /// })
1503 /// .await
1504 /// .unwrap();
1505 /// assert_eq!(count, 1);
1506 /// # })
1507 /// ```
1508 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
1509 where
1510 F: FnMut(I) -> Result<bool, ViewError> + Send,
1511 {
1512 self.collection
1513 .for_each_key_while(|key| {
1514 let index = BaseKey::deserialize_value(key)?;
1515 f(index)
1516 })
1517 .await?;
1518 Ok(())
1519 }
1520
1521 /// Applies a function f on each index. Indices are visited in an order
1522 /// determined by the serialization.
1523 /// ```rust
1524 /// # tokio_test::block_on(async {
1525 /// # use linera_views::context::MemoryContext;
1526 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1527 /// # use linera_views::register_view::RegisterView;
1528 /// # use linera_views::views::View;
1529 /// # let context = MemoryContext::new_for_testing(());
1530 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1531 /// ReentrantCollectionView::load(context).await.unwrap();
1532 /// view.try_load_entry_mut(&23).await.unwrap();
1533 /// view.try_load_entry_mut(&28).await.unwrap();
1534 /// let mut count = 0;
1535 /// view.for_each_index(|_key| {
1536 /// count += 1;
1537 /// Ok(())
1538 /// })
1539 /// .await
1540 /// .unwrap();
1541 /// assert_eq!(count, 2);
1542 /// # })
1543 /// ```
1544 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
1545 where
1546 F: FnMut(I) -> Result<(), ViewError> + Send,
1547 {
1548 self.collection
1549 .for_each_key(|key| {
1550 let index = BaseKey::deserialize_value(key)?;
1551 f(index)
1552 })
1553 .await?;
1554 Ok(())
1555 }
1556}
1557
1558impl<I, W> HashableView for ReentrantCollectionView<W::Context, I, W>
1559where
1560 W: HashableView,
1561 I: Send + Sync + Serialize + DeserializeOwned,
1562{
1563 type Hasher = sha3::Sha3_256;
1564
1565 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1566 self.collection.hash_mut().await
1567 }
1568
1569 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1570 self.collection.hash().await
1571 }
1572}
1573
1574/// A view that supports accessing a collection of views of the same kind, indexed by an ordered key,
1575/// possibly several subviews at a time.
1576#[derive(Debug)]
1577pub struct ReentrantCustomCollectionView<C, I, W> {
1578 collection: ReentrantByteCollectionView<C, W>,
1579 _phantom: PhantomData<I>,
1580}
1581
1582impl<I, W> View for ReentrantCustomCollectionView<W::Context, I, W>
1583where
1584 W: View,
1585 I: Send + Sync + CustomSerialize,
1586{
1587 const NUM_INIT_KEYS: usize = ReentrantByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
1588
1589 type Context = W::Context;
1590
1591 fn context(&self) -> &Self::Context {
1592 self.collection.context()
1593 }
1594
1595 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
1596 ReentrantByteCollectionView::<_, W>::pre_load(context)
1597 }
1598
1599 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
1600 let collection = ReentrantByteCollectionView::post_load(context, values)?;
1601 Ok(ReentrantCustomCollectionView {
1602 collection,
1603 _phantom: PhantomData,
1604 })
1605 }
1606
1607 async fn load(context: Self::Context) -> Result<Self, ViewError> {
1608 Self::post_load(context, &[])
1609 }
1610
1611 fn rollback(&mut self) {
1612 self.collection.rollback()
1613 }
1614
1615 async fn has_pending_changes(&self) -> bool {
1616 self.collection.has_pending_changes().await
1617 }
1618
1619 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
1620 self.collection.flush(batch)
1621 }
1622
1623 fn clear(&mut self) {
1624 self.collection.clear()
1625 }
1626}
1627
1628impl<I, W> ClonableView for ReentrantCustomCollectionView<W::Context, I, W>
1629where
1630 W: ClonableView,
1631 Self: View,
1632{
1633 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
1634 Ok(ReentrantCustomCollectionView {
1635 collection: self.collection.clone_unchecked()?,
1636 _phantom: PhantomData,
1637 })
1638 }
1639}
1640
1641impl<I, W> ReentrantCustomCollectionView<W::Context, I, W>
1642where
1643 W: View,
1644 I: Sync + Clone + Send + CustomSerialize,
1645{
1646 /// Loads a subview for the data at the given index in the collection. If an entry
1647 /// is absent then a default entry is put in the collection on this index.
1648 /// ```rust
1649 /// # tokio_test::block_on(async {
1650 /// # use linera_views::context::MemoryContext;
1651 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1652 /// # use linera_views::register_view::RegisterView;
1653 /// # use linera_views::views::View;
1654 /// # let context = MemoryContext::new_for_testing(());
1655 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1656 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1657 /// let subview = view.try_load_entry_mut(&23).await.unwrap();
1658 /// let value = subview.get();
1659 /// assert_eq!(*value, String::default());
1660 /// # })
1661 /// ```
1662 pub async fn try_load_entry_mut<Q>(
1663 &mut self,
1664 index: &Q,
1665 ) -> Result<WriteGuardedView<W>, ViewError>
1666 where
1667 I: Borrow<Q>,
1668 Q: CustomSerialize,
1669 {
1670 let short_key = index.to_custom_bytes()?;
1671 self.collection.try_load_entry_mut(&short_key).await
1672 }
1673
1674 /// Loads a subview at the given index in the collection and gives read-only access to the data.
1675 /// If an entry is absent then `None` is returned.
1676 /// ```rust
1677 /// # tokio_test::block_on(async {
1678 /// # use linera_views::context::MemoryContext;
1679 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1680 /// # use linera_views::register_view::RegisterView;
1681 /// # use linera_views::views::View;
1682 /// # let context = MemoryContext::new_for_testing(());
1683 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1684 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1685 /// {
1686 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1687 /// }
1688 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1689 /// let value = subview.get();
1690 /// assert_eq!(*value, String::default());
1691 /// # })
1692 /// ```
1693 pub async fn try_load_entry<Q>(
1694 &self,
1695 index: &Q,
1696 ) -> Result<Option<ReadGuardedView<W>>, ViewError>
1697 where
1698 I: Borrow<Q>,
1699 Q: CustomSerialize,
1700 {
1701 let short_key = index.to_custom_bytes()?;
1702 self.collection.try_load_entry(&short_key).await
1703 }
1704
1705 /// Returns `true` if the collection contains a value for the specified key.
1706 /// ```rust
1707 /// # tokio_test::block_on(async {
1708 /// # use linera_views::context::MemoryContext;
1709 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1710 /// # use linera_views::register_view::RegisterView;
1711 /// # use linera_views::views::View;
1712 /// # let context = MemoryContext::new_for_testing(());
1713 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1714 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1715 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1716 /// assert!(view.contains_key(&23).await.unwrap());
1717 /// assert!(!view.contains_key(&24).await.unwrap());
1718 /// # })
1719 /// ```
1720 pub async fn contains_key<Q>(&self, index: &Q) -> Result<bool, ViewError>
1721 where
1722 I: Borrow<Q>,
1723 Q: CustomSerialize,
1724 {
1725 let short_key = index.to_custom_bytes()?;
1726 self.collection.contains_key(&short_key).await
1727 }
1728
1729 /// Removes an entry. If absent then nothing happens.
1730 /// ```rust
1731 /// # tokio_test::block_on(async {
1732 /// # use linera_views::context::MemoryContext;
1733 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1734 /// # use linera_views::register_view::RegisterView;
1735 /// # use linera_views::views::View;
1736 /// # let context = MemoryContext::new_for_testing(());
1737 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1738 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1739 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1740 /// let value = subview.get_mut();
1741 /// assert_eq!(*value, String::default());
1742 /// view.remove_entry(&23);
1743 /// let keys = view.indices().await.unwrap();
1744 /// assert_eq!(keys.len(), 0);
1745 /// # })
1746 /// ```
1747 pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1748 where
1749 I: Borrow<Q>,
1750 Q: CustomSerialize,
1751 {
1752 let short_key = index.to_custom_bytes()?;
1753 self.collection.remove_entry(short_key);
1754 Ok(())
1755 }
1756
1757 /// Marks the entry so that it is removed in the next flush.
1758 /// ```rust
1759 /// # tokio_test::block_on(async {
1760 /// # use linera_views::context::MemoryContext;
1761 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1762 /// # use linera_views::register_view::RegisterView;
1763 /// # use linera_views::views::View;
1764 /// # let context = MemoryContext::new_for_testing(());
1765 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1766 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1767 /// {
1768 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1769 /// let value = subview.get_mut();
1770 /// *value = String::from("Hello");
1771 /// }
1772 /// {
1773 /// view.try_reset_entry_to_default(&23).unwrap();
1774 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1775 /// let value = subview.get();
1776 /// assert_eq!(*value, String::default());
1777 /// }
1778 /// # })
1779 /// ```
1780 pub fn try_reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1781 where
1782 I: Borrow<Q>,
1783 Q: CustomSerialize,
1784 {
1785 let short_key = index.to_custom_bytes()?;
1786 self.collection.try_reset_entry_to_default(&short_key)
1787 }
1788
1789 /// Gets the extra data.
1790 pub fn extra(&self) -> &<W::Context as Context>::Extra {
1791 self.collection.extra()
1792 }
1793}
1794
1795impl<I, W: View> ReentrantCustomCollectionView<W::Context, I, W>
1796where
1797 I: Sync + Clone + Send + CustomSerialize,
1798{
1799 /// Load multiple entries for writing at once.
1800 /// The entries in indices have to be all distinct.
1801 /// ```rust
1802 /// # tokio_test::block_on(async {
1803 /// # use linera_views::context::MemoryContext;
1804 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1805 /// # use linera_views::register_view::RegisterView;
1806 /// # use linera_views::views::View;
1807 /// # let context = MemoryContext::new_for_testing(());
1808 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1809 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1810 /// let indices = vec![23, 42];
1811 /// let subviews = view.try_load_entries_mut(indices).await.unwrap();
1812 /// let value1 = subviews[0].get();
1813 /// let value2 = subviews[1].get();
1814 /// assert_eq!(*value1, String::default());
1815 /// assert_eq!(*value2, String::default());
1816 /// # })
1817 /// ```
1818 pub async fn try_load_entries_mut<Q>(
1819 &mut self,
1820 indices: impl IntoIterator<Item = Q>,
1821 ) -> Result<Vec<WriteGuardedView<W>>, ViewError>
1822 where
1823 I: Borrow<Q>,
1824 Q: CustomSerialize,
1825 {
1826 let short_keys = indices
1827 .into_iter()
1828 .map(|index| index.to_custom_bytes())
1829 .collect::<Result<_, _>>()?;
1830 self.collection.try_load_entries_mut(short_keys).await
1831 }
1832
1833 /// Load multiple entries for reading at once.
1834 /// The entries in indices have to be all distinct.
1835 /// ```rust
1836 /// # tokio_test::block_on(async {
1837 /// # use linera_views::context::MemoryContext;
1838 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1839 /// # use linera_views::register_view::RegisterView;
1840 /// # use linera_views::views::View;
1841 /// # let context = MemoryContext::new_for_testing(());
1842 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1843 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1844 /// {
1845 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1846 /// }
1847 /// let indices = vec![23, 42];
1848 /// let subviews = view.try_load_entries(indices).await.unwrap();
1849 /// assert!(subviews[1].is_none());
1850 /// let value0 = subviews[0].as_ref().unwrap().get();
1851 /// assert_eq!(*value0, String::default());
1852 /// # })
1853 /// ```
1854 pub async fn try_load_entries<Q>(
1855 &self,
1856 indices: impl IntoIterator<Item = Q>,
1857 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1858 where
1859 I: Borrow<Q>,
1860 Q: CustomSerialize,
1861 {
1862 let short_keys = indices
1863 .into_iter()
1864 .map(|index| index.to_custom_bytes())
1865 .collect::<Result<_, _>>()?;
1866 self.collection.try_load_entries(short_keys).await
1867 }
1868
1869 /// Loads all entries for writing at once.
1870 /// The entries in indices have to be all distinct.
1871 /// ```rust
1872 /// # tokio_test::block_on(async {
1873 /// # use linera_views::context::MemoryContext;
1874 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1875 /// # use linera_views::register_view::RegisterView;
1876 /// # use linera_views::views::View;
1877 /// # let context = MemoryContext::new_for_testing(());
1878 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1879 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1880 /// {
1881 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1882 /// }
1883 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
1884 /// assert_eq!(subviews.len(), 1);
1885 /// # })
1886 /// ```
1887 pub async fn try_load_all_entries_mut<Q>(
1888 &mut self,
1889 ) -> Result<Vec<(I, WriteGuardedView<W>)>, ViewError>
1890 where
1891 I: Borrow<Q>,
1892 Q: CustomSerialize,
1893 {
1894 let results = self.collection.try_load_all_entries_mut().await?;
1895 results
1896 .into_iter()
1897 .map(|(short_key, view)| {
1898 let index = I::from_custom_bytes(&short_key)?;
1899 Ok((index, view))
1900 })
1901 .collect()
1902 }
1903
1904 /// Load multiple entries for reading at once.
1905 /// The entries in indices have to be all distinct.
1906 /// ```rust
1907 /// # tokio_test::block_on(async {
1908 /// # use linera_views::context::MemoryContext;
1909 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1910 /// # use linera_views::register_view::RegisterView;
1911 /// # use linera_views::views::View;
1912 /// # let context = MemoryContext::new_for_testing(());
1913 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1914 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1915 /// {
1916 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1917 /// }
1918 /// let subviews = view.try_load_all_entries().await.unwrap();
1919 /// assert_eq!(subviews.len(), 1);
1920 /// # })
1921 /// ```
1922 pub async fn try_load_all_entries<Q>(&self) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError>
1923 where
1924 I: Borrow<Q>,
1925 Q: CustomSerialize,
1926 {
1927 let results = self.collection.try_load_all_entries().await?;
1928 results
1929 .into_iter()
1930 .map(|(short_key, view)| {
1931 let index = I::from_custom_bytes(&short_key)?;
1932 Ok((index, view))
1933 })
1934 .collect()
1935 }
1936}
1937
1938impl<I, W> ReentrantCustomCollectionView<W::Context, I, W>
1939where
1940 W: View,
1941 I: Sync + Clone + Send + CustomSerialize,
1942{
1943 /// Returns the list of indices in the collection. The order is determined by
1944 /// the custom serialization.
1945 /// ```rust
1946 /// # tokio_test::block_on(async {
1947 /// # use linera_views::context::MemoryContext;
1948 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1949 /// # use linera_views::register_view::RegisterView;
1950 /// # use linera_views::views::View;
1951 /// # let context = MemoryContext::new_for_testing(());
1952 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1953 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1954 /// view.try_load_entry_mut(&23).await.unwrap();
1955 /// view.try_load_entry_mut(&25).await.unwrap();
1956 /// let indices = view.indices().await.unwrap();
1957 /// assert_eq!(indices, vec![23, 25]);
1958 /// # })
1959 /// ```
1960 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
1961 let mut indices = Vec::new();
1962 self.for_each_index(|index| {
1963 indices.push(index);
1964 Ok(())
1965 })
1966 .await?;
1967 Ok(indices)
1968 }
1969
1970 /// Returns the number of entries in the collection.
1971 /// ```rust
1972 /// # tokio_test::block_on(async {
1973 /// # use linera_views::context::MemoryContext;
1974 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1975 /// # use linera_views::register_view::RegisterView;
1976 /// # use linera_views::views::View;
1977 /// # let context = MemoryContext::new_for_testing(());
1978 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1979 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1980 /// view.try_load_entry_mut(&23).await.unwrap();
1981 /// view.try_load_entry_mut(&25).await.unwrap();
1982 /// assert_eq!(view.count().await.unwrap(), 2);
1983 /// # })
1984 /// ```
1985 pub async fn count(&self) -> Result<usize, ViewError> {
1986 self.collection.count().await
1987 }
1988
1989 /// Applies a function f on each index. Indices are visited in an order
1990 /// determined by the custom serialization. If the function f returns false
1991 /// then the loop ends prematurely.
1992 /// ```rust
1993 /// # tokio_test::block_on(async {
1994 /// # use linera_views::context::MemoryContext;
1995 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1996 /// # use linera_views::register_view::RegisterView;
1997 /// # use linera_views::views::View;
1998 /// # let context = MemoryContext::new_for_testing(());
1999 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2000 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2001 /// view.try_load_entry_mut(&28).await.unwrap();
2002 /// view.try_load_entry_mut(&24).await.unwrap();
2003 /// view.try_load_entry_mut(&23).await.unwrap();
2004 /// let mut part_indices = Vec::new();
2005 /// view.for_each_index_while(|index| {
2006 /// part_indices.push(index);
2007 /// Ok(part_indices.len() < 2)
2008 /// })
2009 /// .await
2010 /// .unwrap();
2011 /// assert_eq!(part_indices, vec![23, 24]);
2012 /// # })
2013 /// ```
2014 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
2015 where
2016 F: FnMut(I) -> Result<bool, ViewError> + Send,
2017 {
2018 self.collection
2019 .for_each_key_while(|key| {
2020 let index = I::from_custom_bytes(key)?;
2021 f(index)
2022 })
2023 .await?;
2024 Ok(())
2025 }
2026
2027 /// Applies a function f on each index. Indices are visited in an order
2028 /// determined by the custom serialization.
2029 /// ```rust
2030 /// # tokio_test::block_on(async {
2031 /// # use linera_views::context::MemoryContext;
2032 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2033 /// # use linera_views::register_view::RegisterView;
2034 /// # use linera_views::views::View;
2035 /// # let context = MemoryContext::new_for_testing(());
2036 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2037 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2038 /// view.try_load_entry_mut(&28).await.unwrap();
2039 /// view.try_load_entry_mut(&24).await.unwrap();
2040 /// view.try_load_entry_mut(&23).await.unwrap();
2041 /// let mut indices = Vec::new();
2042 /// view.for_each_index(|index| {
2043 /// indices.push(index);
2044 /// Ok(())
2045 /// })
2046 /// .await
2047 /// .unwrap();
2048 /// assert_eq!(indices, vec![23, 24, 28]);
2049 /// # })
2050 /// ```
2051 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
2052 where
2053 F: FnMut(I) -> Result<(), ViewError> + Send,
2054 {
2055 self.collection
2056 .for_each_key(|key| {
2057 let index = I::from_custom_bytes(key)?;
2058 f(index)
2059 })
2060 .await?;
2061 Ok(())
2062 }
2063}
2064
2065impl<I, W> HashableView for ReentrantCustomCollectionView<W::Context, I, W>
2066where
2067 W: HashableView,
2068 I: Send + Sync + CustomSerialize,
2069{
2070 type Hasher = sha3::Sha3_256;
2071
2072 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
2073 self.collection.hash_mut().await
2074 }
2075
2076 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
2077 self.collection.hash().await
2078 }
2079}
2080
2081/// Type wrapping `ReentrantByteCollectionView` while memoizing the hash.
2082pub type HashedReentrantByteCollectionView<C, W> =
2083 WrappedHashableContainerView<C, ReentrantByteCollectionView<C, W>, HasherOutput>;
2084
2085/// Type wrapping `ReentrantCollectionView` while memoizing the hash.
2086pub type HashedReentrantCollectionView<C, I, W> =
2087 WrappedHashableContainerView<C, ReentrantCollectionView<C, I, W>, HasherOutput>;
2088
2089/// Type wrapping `ReentrantCustomCollectionView` while memoizing the hash.
2090pub type HashedReentrantCustomCollectionView<C, I, W> =
2091 WrappedHashableContainerView<C, ReentrantCustomCollectionView<C, I, W>, HasherOutput>;
2092
2093#[cfg(with_graphql)]
2094mod graphql {
2095 use std::borrow::Cow;
2096
2097 use super::{ReadGuardedView, ReentrantCollectionView};
2098 use crate::{
2099 graphql::{hash_name, mangle, missing_key_error, Entry, MapFilters, MapInput},
2100 views::View,
2101 };
2102
2103 impl<T: async_graphql::OutputType> async_graphql::OutputType for ReadGuardedView<T> {
2104 fn type_name() -> Cow<'static, str> {
2105 T::type_name()
2106 }
2107
2108 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
2109 T::create_type_info(registry)
2110 }
2111
2112 async fn resolve(
2113 &self,
2114 ctx: &async_graphql::ContextSelectionSet<'_>,
2115 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
2116 ) -> async_graphql::ServerResult<async_graphql::Value> {
2117 (**self).resolve(ctx, field).await
2118 }
2119 }
2120
2121 impl<C: Send + Sync, K: async_graphql::OutputType, V: async_graphql::OutputType>
2122 async_graphql::TypeName for ReentrantCollectionView<C, K, V>
2123 {
2124 fn type_name() -> Cow<'static, str> {
2125 format!(
2126 "ReentrantCollectionView_{}_{}_{}",
2127 mangle(K::type_name()),
2128 mangle(V::type_name()),
2129 hash_name::<(K, V)>(),
2130 )
2131 .into()
2132 }
2133 }
2134
2135 #[async_graphql::Object(cache_control(no_cache), name_type)]
2136 impl<K, V> ReentrantCollectionView<V::Context, K, V>
2137 where
2138 K: async_graphql::InputType
2139 + async_graphql::OutputType
2140 + serde::ser::Serialize
2141 + serde::de::DeserializeOwned
2142 + std::fmt::Debug
2143 + Clone,
2144 V: View + async_graphql::OutputType,
2145 MapInput<K>: async_graphql::InputType,
2146 MapFilters<K>: async_graphql::InputType,
2147 {
2148 async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
2149 Ok(self.indices().await?)
2150 }
2151
2152 async fn entry(
2153 &self,
2154 key: K,
2155 ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
2156 let value = self
2157 .try_load_entry(&key)
2158 .await?
2159 .ok_or_else(|| missing_key_error(&key))?;
2160 Ok(Entry { value, key })
2161 }
2162
2163 async fn entries(
2164 &self,
2165 input: Option<MapInput<K>>,
2166 ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
2167 let keys = if let Some(keys) = input
2168 .and_then(|input| input.filters)
2169 .and_then(|filters| filters.keys)
2170 {
2171 keys
2172 } else {
2173 self.indices().await?
2174 };
2175
2176 let mut values = vec![];
2177 for key in keys {
2178 let value = self
2179 .try_load_entry(&key)
2180 .await?
2181 .ok_or_else(|| missing_key_error(&key))?;
2182 values.push(Entry { value, key })
2183 }
2184
2185 Ok(values)
2186 }
2187 }
2188
2189 use crate::reentrant_collection_view::ReentrantCustomCollectionView;
2190 impl<C: Send + Sync, K: async_graphql::OutputType, V: async_graphql::OutputType>
2191 async_graphql::TypeName for ReentrantCustomCollectionView<C, K, V>
2192 {
2193 fn type_name() -> Cow<'static, str> {
2194 format!(
2195 "ReentrantCustomCollectionView_{}_{}_{:08x}",
2196 mangle(K::type_name()),
2197 mangle(V::type_name()),
2198 hash_name::<(K, V)>(),
2199 )
2200 .into()
2201 }
2202 }
2203
2204 #[async_graphql::Object(cache_control(no_cache), name_type)]
2205 impl<K, V> ReentrantCustomCollectionView<V::Context, K, V>
2206 where
2207 K: async_graphql::InputType
2208 + async_graphql::OutputType
2209 + crate::common::CustomSerialize
2210 + std::fmt::Debug
2211 + Clone,
2212 V: View + async_graphql::OutputType,
2213 MapInput<K>: async_graphql::InputType,
2214 MapFilters<K>: async_graphql::InputType,
2215 {
2216 async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
2217 Ok(self.indices().await?)
2218 }
2219
2220 async fn entry(
2221 &self,
2222 key: K,
2223 ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
2224 let value = self
2225 .try_load_entry(&key)
2226 .await?
2227 .ok_or_else(|| missing_key_error(&key))?;
2228 Ok(Entry { value, key })
2229 }
2230
2231 async fn entries(
2232 &self,
2233 input: Option<MapInput<K>>,
2234 ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
2235 let keys = if let Some(keys) = input
2236 .and_then(|input| input.filters)
2237 .and_then(|filters| filters.keys)
2238 {
2239 keys
2240 } else {
2241 self.indices().await?
2242 };
2243
2244 let mut values = vec![];
2245 for key in keys {
2246 let value = self
2247 .try_load_entry(&key)
2248 .await?
2249 .ok_or_else(|| missing_key_error(&key))?;
2250 values.push(Entry { value, key })
2251 }
2252
2253 Ok(values)
2254 }
2255 }
2256}