linera_views/views/reentrant_collection_view.rs
1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5 borrow::Borrow,
6 collections::{btree_map, BTreeMap},
7 io::Write,
8 marker::PhantomData,
9 mem,
10 sync::{Arc, Mutex},
11};
12
13use async_lock::{RwLock, RwLockReadGuardArc, RwLockWriteGuardArc};
14#[cfg(with_metrics)]
15use linera_base::prometheus_util::MeasureLatency as _;
16use serde::{de::DeserializeOwned, Serialize};
17
18use crate::{
19 batch::Batch,
20 common::{CustomSerialize, HasherOutput, Update},
21 context::{BaseKey, Context},
22 hashable_wrapper::WrappedHashableContainerView,
23 store::ReadableKeyValueStore as _,
24 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
25};
26
27#[cfg(with_metrics)]
28mod metrics {
29 use std::sync::LazyLock;
30
31 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
32 use prometheus::HistogramVec;
33
34 /// The runtime of hash computation
35 pub static REENTRANT_COLLECTION_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> =
36 LazyLock::new(|| {
37 register_histogram_vec(
38 "reentrant_collection_view_hash_runtime",
39 "ReentrantCollectionView hash runtime",
40 &[],
41 exponential_bucket_latencies(5.0),
42 )
43 });
44}
45
46/// A read-only accessor for a particular subview in a [`ReentrantCollectionView`].
47#[derive(Debug)]
48pub struct ReadGuardedView<T>(RwLockReadGuardArc<T>);
49
50impl<T> std::ops::Deref for ReadGuardedView<T> {
51 type Target = T;
52 fn deref(&self) -> &T {
53 self.0.deref()
54 }
55}
56
57/// A read-write accessor for a particular subview in a [`ReentrantCollectionView`].
58#[derive(Debug)]
59pub struct WriteGuardedView<T>(RwLockWriteGuardArc<T>);
60
61impl<T> std::ops::Deref for WriteGuardedView<T> {
62 type Target = T;
63 fn deref(&self) -> &T {
64 self.0.deref()
65 }
66}
67
68impl<T> std::ops::DerefMut for WriteGuardedView<T> {
69 fn deref_mut(&mut self) -> &mut T {
70 self.0.deref_mut()
71 }
72}
73
74/// A view that supports accessing a collection of views of the same kind, indexed by `Vec<u8>`,
75/// possibly several subviews at a time.
76#[derive(Debug)]
77pub struct ReentrantByteCollectionView<C, W> {
78 /// The view [`Context`].
79 context: C,
80 /// If the current persisted data will be completely erased and replaced on the next flush.
81 delete_storage_first: bool,
82 /// Entries that may have staged changes.
83 updates: BTreeMap<Vec<u8>, Update<Arc<RwLock<W>>>>,
84 /// Entries cached in memory that have the exact same state as in the persistent storage.
85 cached_entries: Mutex<BTreeMap<Vec<u8>, Arc<RwLock<W>>>>,
86}
87
88/// We need to find new base keys in order to implement the collection view.
89/// We do this by appending a value to the base key.
90///
91/// Sub-views in a collection share a common key prefix, like in other view types. However,
92/// just concatenating the shared prefix with sub-view keys makes it impossible to distinguish if a
93/// given key belongs to a child sub-view or a grandchild sub-view (consider for example if a
94/// collection is stored inside the collection).
95#[repr(u8)]
96enum KeyTag {
97 /// Prefix for specifying an index and serves to indicate the existence of an entry in the collection.
98 Index = MIN_VIEW_TAG,
99 /// Prefix for specifying as the prefix for the sub-view.
100 Subview,
101}
102
103impl<W: View> View for ReentrantByteCollectionView<W::Context, W> {
104 const NUM_INIT_KEYS: usize = 0;
105
106 type Context = W::Context;
107
108 fn context(&self) -> &Self::Context {
109 &self.context
110 }
111
112 fn pre_load(_context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
113 Ok(Vec::new())
114 }
115
116 fn post_load(context: Self::Context, _values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
117 Ok(Self {
118 context,
119 delete_storage_first: false,
120 updates: BTreeMap::new(),
121 cached_entries: Mutex::new(BTreeMap::new()),
122 })
123 }
124
125 async fn load(context: Self::Context) -> Result<Self, ViewError> {
126 Self::post_load(context, &[])
127 }
128
129 fn rollback(&mut self) {
130 self.delete_storage_first = false;
131 self.updates.clear();
132 }
133
134 async fn has_pending_changes(&self) -> bool {
135 if self.delete_storage_first {
136 return true;
137 }
138 !self.updates.is_empty()
139 }
140
141 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
142 let mut delete_view = false;
143 if self.delete_storage_first {
144 delete_view = true;
145 batch.delete_key_prefix(self.context.base_key().bytes.clone());
146 for (index, update) in mem::take(&mut self.updates) {
147 if let Update::Set(view) = update {
148 let mut view = Arc::try_unwrap(view)
149 .map_err(|_| ViewError::TryLockError(index.clone()))?
150 .into_inner();
151 view.flush(batch)?;
152 self.add_index(batch, &index);
153 delete_view = false;
154 }
155 }
156 } else {
157 for (index, update) in mem::take(&mut self.updates) {
158 match update {
159 Update::Set(view) => {
160 let mut view = Arc::try_unwrap(view)
161 .map_err(|_| ViewError::TryLockError(index.clone()))?
162 .into_inner();
163 view.flush(batch)?;
164 self.add_index(batch, &index);
165 }
166 Update::Removed => {
167 let key_subview = self.get_subview_key(&index);
168 let key_index = self.get_index_key(&index);
169 batch.delete_key(key_index);
170 batch.delete_key_prefix(key_subview);
171 }
172 }
173 }
174 }
175 self.delete_storage_first = false;
176 Ok(delete_view)
177 }
178
179 fn clear(&mut self) {
180 self.delete_storage_first = true;
181 self.updates.clear();
182 self.cached_entries.get_mut().unwrap().clear();
183 }
184}
185
186impl<W: ClonableView> ClonableView for ReentrantByteCollectionView<W::Context, W> {
187 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
188 let cloned_updates = self
189 .updates
190 .iter()
191 .map(|(key, value)| {
192 let cloned_value = match value {
193 Update::Removed => Update::Removed,
194 Update::Set(view_lock) => {
195 let mut view = view_lock
196 .try_write()
197 .ok_or(ViewError::TryLockError(key.to_vec()))?;
198
199 Update::Set(Arc::new(RwLock::new(view.clone_unchecked()?)))
200 }
201 };
202 Ok((key.clone(), cloned_value))
203 })
204 .collect::<Result<_, ViewError>>()?;
205
206 Ok(ReentrantByteCollectionView {
207 context: self.context.clone(),
208 delete_storage_first: self.delete_storage_first,
209 updates: cloned_updates,
210 cached_entries: Mutex::new(BTreeMap::new()),
211 })
212 }
213}
214
215impl<C: Context, W> ReentrantByteCollectionView<C, W> {
216 fn get_index_key(&self, index: &[u8]) -> Vec<u8> {
217 self.context
218 .base_key()
219 .base_tag_index(KeyTag::Index as u8, index)
220 }
221
222 fn get_subview_key(&self, index: &[u8]) -> Vec<u8> {
223 self.context
224 .base_key()
225 .base_tag_index(KeyTag::Subview as u8, index)
226 }
227
228 fn add_index(&self, batch: &mut Batch, index: &[u8]) {
229 let key = self.get_index_key(index);
230 batch.put_key_value_bytes(key, vec![]);
231 }
232}
233
234impl<W: View> ReentrantByteCollectionView<W::Context, W> {
235 /// Reads the view and if missing returns the default view
236 async fn wrapped_view(
237 context: &W::Context,
238 delete_storage_first: bool,
239 short_key: &[u8],
240 ) -> Result<Arc<RwLock<W>>, ViewError> {
241 let key = context
242 .base_key()
243 .base_tag_index(KeyTag::Subview as u8, short_key);
244 let context = context.clone_with_base_key(key);
245 // Obtain a view and set its pending state to the default (e.g. empty) state
246 let view = if delete_storage_first {
247 W::new(context)?
248 } else {
249 W::load(context).await?
250 };
251 Ok(Arc::new(RwLock::new(view)))
252 }
253
254 /// Load the view and insert it into the updates if needed.
255 /// If the entry is missing, then it is set to default.
256 async fn try_load_view_mut(&mut self, short_key: &[u8]) -> Result<Arc<RwLock<W>>, ViewError> {
257 use btree_map::Entry::*;
258 Ok(match self.updates.entry(short_key.to_owned()) {
259 Occupied(mut entry) => match entry.get_mut() {
260 Update::Set(view) => view.clone(),
261 entry @ Update::Removed => {
262 let wrapped_view = Self::wrapped_view(&self.context, true, short_key).await?;
263 *entry = Update::Set(wrapped_view.clone());
264 wrapped_view
265 }
266 },
267 Vacant(entry) => {
268 let wrapped_view = match self.cached_entries.get_mut().unwrap().remove(short_key) {
269 Some(view) => view,
270 None => {
271 Self::wrapped_view(&self.context, self.delete_storage_first, short_key)
272 .await?
273 }
274 };
275 entry.insert(Update::Set(wrapped_view.clone()));
276 wrapped_view
277 }
278 })
279 }
280
281 /// Load the view from the update is available.
282 /// If missing, then the entry is loaded from storage and if
283 /// missing there an error is reported.
284 async fn try_load_view(&self, short_key: &[u8]) -> Result<Option<Arc<RwLock<W>>>, ViewError> {
285 Ok(if let Some(entry) = self.updates.get(short_key) {
286 match entry {
287 Update::Set(view) => Some(view.clone()),
288 _entry @ Update::Removed => None,
289 }
290 } else if self.delete_storage_first {
291 None
292 } else {
293 let view = {
294 let cached_entries = self.cached_entries.lock().unwrap();
295 let view = cached_entries.get(short_key);
296 view.cloned()
297 };
298 if let Some(view) = view {
299 Some(view.clone())
300 } else {
301 let key_index = self
302 .context
303 .base_key()
304 .base_tag_index(KeyTag::Index as u8, short_key);
305 if self.context.store().contains_key(&key_index).await? {
306 let view = Self::wrapped_view(&self.context, false, short_key).await?;
307 let mut cached_entries = self.cached_entries.lock().unwrap();
308 cached_entries.insert(short_key.to_owned(), view.clone());
309 Some(view)
310 } else {
311 None
312 }
313 }
314 })
315 }
316
317 /// Loads a subview for the data at the given index in the collection. If an entry
318 /// is absent then a default entry is added to the collection. The resulting view
319 /// can be modified.
320 /// ```rust
321 /// # tokio_test::block_on(async {
322 /// # use linera_views::context::MemoryContext;
323 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
324 /// # use linera_views::register_view::RegisterView;
325 /// # use linera_views::views::View;
326 /// # let context = MemoryContext::new_for_testing(());
327 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
328 /// ReentrantByteCollectionView::load(context).await.unwrap();
329 /// let subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
330 /// let value = subview.get();
331 /// assert_eq!(*value, String::default());
332 /// # })
333 /// ```
334 pub async fn try_load_entry_mut(
335 &mut self,
336 short_key: &[u8],
337 ) -> Result<WriteGuardedView<W>, ViewError> {
338 Ok(WriteGuardedView(
339 self.try_load_view_mut(short_key)
340 .await?
341 .try_write_arc()
342 .ok_or_else(|| ViewError::TryLockError(short_key.to_vec()))?,
343 ))
344 }
345
346 /// Loads a subview at the given index in the collection and gives read-only access to the data.
347 /// If an entry is absent then `None` is returned.
348 /// ```rust
349 /// # tokio_test::block_on(async {
350 /// # use linera_views::context::MemoryContext;
351 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
352 /// # use linera_views::register_view::RegisterView;
353 /// # use linera_views::views::View;
354 /// # let context = MemoryContext::new_for_testing(());
355 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
356 /// ReentrantByteCollectionView::load(context).await.unwrap();
357 /// {
358 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
359 /// }
360 /// let subview = view.try_load_entry(&[0, 1]).await.unwrap().unwrap();
361 /// let value = subview.get();
362 /// assert_eq!(*value, String::default());
363 /// # })
364 /// ```
365 pub async fn try_load_entry(
366 &self,
367 short_key: &[u8],
368 ) -> Result<Option<ReadGuardedView<W>>, ViewError> {
369 match self.try_load_view(short_key).await? {
370 None => Ok(None),
371 Some(view) => Ok(Some(ReadGuardedView(
372 view.try_read_arc()
373 .ok_or_else(|| ViewError::TryLockError(short_key.to_vec()))?,
374 ))),
375 }
376 }
377
378 /// Returns `true` if the collection contains a value for the specified key.
379 /// ```rust
380 /// # tokio_test::block_on(async {
381 /// # use linera_views::context::MemoryContext;
382 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
383 /// # use linera_views::register_view::RegisterView;
384 /// # use linera_views::views::View;
385 /// # let context = MemoryContext::new_for_testing(());
386 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
387 /// ReentrantByteCollectionView::load(context).await.unwrap();
388 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
389 /// assert!(view.contains_key(&[0, 1]).await.unwrap());
390 /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
391 /// # })
392 /// ```
393 pub async fn contains_key(&self, short_key: &[u8]) -> Result<bool, ViewError> {
394 Ok(if let Some(entry) = self.updates.get(short_key) {
395 match entry {
396 Update::Set(_view) => true,
397 Update::Removed => false,
398 }
399 } else if self.delete_storage_first {
400 false
401 } else if self.cached_entries.lock().unwrap().contains_key(short_key) {
402 true
403 } else {
404 let key_index = self
405 .context
406 .base_key()
407 .base_tag_index(KeyTag::Index as u8, short_key);
408 self.context.store().contains_key(&key_index).await?
409 })
410 }
411
412 /// Removes an entry. If absent then nothing happens.
413 /// ```rust
414 /// # tokio_test::block_on(async {
415 /// # use linera_views::context::MemoryContext;
416 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
417 /// # use linera_views::register_view::RegisterView;
418 /// # use linera_views::views::View;
419 /// # let context = MemoryContext::new_for_testing(());
420 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
421 /// ReentrantByteCollectionView::load(context).await.unwrap();
422 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
423 /// let value = subview.get_mut();
424 /// assert_eq!(*value, String::default());
425 /// view.remove_entry(vec![0, 1]);
426 /// let keys = view.keys().await.unwrap();
427 /// assert_eq!(keys.len(), 0);
428 /// # })
429 /// ```
430 pub fn remove_entry(&mut self, short_key: Vec<u8>) {
431 self.cached_entries.get_mut().unwrap().remove(&short_key);
432 if self.delete_storage_first {
433 // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
434 self.updates.remove(&short_key);
435 } else {
436 self.updates.insert(short_key, Update::Removed);
437 }
438 }
439
440 /// Marks the entry so that it is removed in the next flush.
441 /// ```rust
442 /// # tokio_test::block_on(async {
443 /// # use linera_views::context::MemoryContext;
444 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
445 /// # use linera_views::register_view::RegisterView;
446 /// # use linera_views::views::View;
447 /// # let context = MemoryContext::new_for_testing(());
448 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
449 /// ReentrantByteCollectionView::load(context).await.unwrap();
450 /// {
451 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
452 /// let value = subview.get_mut();
453 /// *value = String::from("Hello");
454 /// }
455 /// view.try_reset_entry_to_default(&[0, 1]).unwrap();
456 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
457 /// let value = subview.get_mut();
458 /// assert_eq!(*value, String::default());
459 /// # })
460 /// ```
461 pub fn try_reset_entry_to_default(&mut self, short_key: &[u8]) -> Result<(), ViewError> {
462 let key = self
463 .context
464 .base_key()
465 .base_tag_index(KeyTag::Subview as u8, short_key);
466 let context = self.context.clone_with_base_key(key);
467 let view = W::new(context)?;
468 let view = Arc::new(RwLock::new(view));
469 let view = Update::Set(view);
470 self.updates.insert(short_key.to_vec(), view);
471 self.cached_entries.get_mut().unwrap().remove(short_key);
472 Ok(())
473 }
474
475 /// Gets the extra data.
476 pub fn extra(&self) -> &<W::Context as Context>::Extra {
477 self.context.extra()
478 }
479}
480
481impl<W: View> ReentrantByteCollectionView<W::Context, W> {
482 /// Loads multiple entries for writing at once.
483 /// The entries in `short_keys` have to be all distinct.
484 /// ```rust
485 /// # tokio_test::block_on(async {
486 /// # use linera_views::context::MemoryContext;
487 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
488 /// # use linera_views::register_view::RegisterView;
489 /// # use linera_views::views::View;
490 /// # let context = MemoryContext::new_for_testing(());
491 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
492 /// ReentrantByteCollectionView::load(context).await.unwrap();
493 /// {
494 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
495 /// *subview.get_mut() = "Bonjour".to_string();
496 /// }
497 /// let short_keys = vec![vec![0, 1], vec![2, 3]];
498 /// let subviews = view.try_load_entries_mut(short_keys).await.unwrap();
499 /// let value1 = subviews[0].get();
500 /// let value2 = subviews[1].get();
501 /// assert_eq!(*value1, "Bonjour".to_string());
502 /// assert_eq!(*value2, String::default());
503 /// # })
504 /// ```
505 pub async fn try_load_entries_mut(
506 &mut self,
507 short_keys: Vec<Vec<u8>>,
508 ) -> Result<Vec<WriteGuardedView<W>>, ViewError> {
509 let cached_entries = self.cached_entries.get_mut().unwrap();
510 let mut short_keys_to_load = Vec::new();
511 let mut keys = Vec::new();
512 for short_key in &short_keys {
513 let key = self
514 .context
515 .base_key()
516 .base_tag_index(KeyTag::Subview as u8, short_key);
517 let context = self.context.clone_with_base_key(key);
518 match self.updates.entry(short_key.to_vec()) {
519 btree_map::Entry::Occupied(mut entry) => {
520 if let Update::Removed = entry.get() {
521 let view = W::new(context)?;
522 let view = Arc::new(RwLock::new(view));
523 entry.insert(Update::Set(view));
524 cached_entries.remove(short_key);
525 }
526 }
527 btree_map::Entry::Vacant(entry) => {
528 if self.delete_storage_first {
529 cached_entries.remove(short_key);
530 let view = W::new(context)?;
531 let view = Arc::new(RwLock::new(view));
532 entry.insert(Update::Set(view));
533 } else if let Some(view) = cached_entries.remove(short_key) {
534 entry.insert(Update::Set(view));
535 } else {
536 keys.extend(W::pre_load(&context)?);
537 short_keys_to_load.push(short_key.to_vec());
538 }
539 }
540 }
541 }
542 let values = self.context.store().read_multi_values_bytes(keys).await?;
543 for (loaded_values, short_key) in values
544 .chunks_exact(W::NUM_INIT_KEYS)
545 .zip(short_keys_to_load)
546 {
547 let key = self
548 .context
549 .base_key()
550 .base_tag_index(KeyTag::Subview as u8, &short_key);
551 let context = self.context.clone_with_base_key(key);
552 let view = W::post_load(context, loaded_values)?;
553 let wrapped_view = Arc::new(RwLock::new(view));
554 self.updates
555 .insert(short_key.to_vec(), Update::Set(wrapped_view));
556 }
557
558 short_keys
559 .into_iter()
560 .map(|short_key| {
561 let Some(Update::Set(view)) = self.updates.get(&short_key) else {
562 unreachable!()
563 };
564 Ok(WriteGuardedView(
565 view.clone()
566 .try_write_arc()
567 .ok_or_else(|| ViewError::TryLockError(short_key))?,
568 ))
569 })
570 .collect()
571 }
572
573 /// Loads multiple entries for reading at once.
574 /// The entries in `short_keys` have to be all distinct.
575 /// ```rust
576 /// # tokio_test::block_on(async {
577 /// # use linera_views::context::MemoryContext;
578 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
579 /// # use linera_views::register_view::RegisterView;
580 /// # use linera_views::views::View;
581 /// # let context = MemoryContext::new_for_testing(());
582 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
583 /// ReentrantByteCollectionView::load(context).await.unwrap();
584 /// {
585 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
586 /// }
587 /// let short_keys = vec![vec![0, 1], vec![2, 3]];
588 /// let subviews = view.try_load_entries(short_keys).await.unwrap();
589 /// assert!(subviews[1].is_none());
590 /// let value0 = subviews[0].as_ref().unwrap().get();
591 /// assert_eq!(*value0, String::default());
592 /// # })
593 /// ```
594 pub async fn try_load_entries(
595 &self,
596 short_keys: Vec<Vec<u8>>,
597 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError> {
598 let mut results = vec![None; short_keys.len()];
599 let mut keys_to_check = Vec::new();
600 let mut keys_to_check_metadata = Vec::new();
601
602 {
603 let cached_entries = self.cached_entries.lock().unwrap();
604 for (position, short_key) in short_keys.into_iter().enumerate() {
605 if let Some(update) = self.updates.get(&short_key) {
606 if let Update::Set(view) = update {
607 results[position] = Some((short_key, view.clone()));
608 }
609 } else if let Some(view) = cached_entries.get(&short_key) {
610 results[position] = Some((short_key, view.clone()));
611 } else if !self.delete_storage_first {
612 let key_index = self
613 .context
614 .base_key()
615 .base_tag_index(KeyTag::Index as u8, &short_key);
616 keys_to_check.push(key_index);
617 keys_to_check_metadata.push((position, short_key));
618 }
619 }
620 }
621
622 let found_keys = self.context.store().contains_keys(keys_to_check).await?;
623 let entries_to_load = keys_to_check_metadata
624 .into_iter()
625 .zip(found_keys)
626 .filter_map(|(metadata, found)| found.then_some(metadata))
627 .map(|(position, short_key)| {
628 let subview_key = self
629 .context
630 .base_key()
631 .base_tag_index(KeyTag::Subview as u8, &short_key);
632 let subview_context = self.context.clone_with_base_key(subview_key);
633 (position, short_key.to_owned(), subview_context)
634 })
635 .collect::<Vec<_>>();
636 if !entries_to_load.is_empty() {
637 let mut keys_to_load = Vec::with_capacity(entries_to_load.len() * W::NUM_INIT_KEYS);
638 for (_, _, context) in &entries_to_load {
639 keys_to_load.extend(W::pre_load(context)?);
640 }
641 let values = self
642 .context
643 .store()
644 .read_multi_values_bytes(keys_to_load)
645 .await?;
646 let mut cached_entries = self.cached_entries.lock().unwrap();
647 for (loaded_values, (position, short_key, context)) in
648 values.chunks_exact(W::NUM_INIT_KEYS).zip(entries_to_load)
649 {
650 let view = W::post_load(context, loaded_values)?;
651 let wrapped_view = Arc::new(RwLock::new(view));
652 cached_entries.insert(short_key.clone(), wrapped_view.clone());
653 results[position] = Some((short_key, wrapped_view));
654 }
655 }
656
657 results
658 .into_iter()
659 .map(|maybe_view| match maybe_view {
660 Some((short_key, view)) => Ok(Some(ReadGuardedView(
661 view.try_read_arc()
662 .ok_or_else(|| ViewError::TryLockError(short_key))?,
663 ))),
664 None => Ok(None),
665 })
666 .collect()
667 }
668
669 /// Loads all the entries for reading at once.
670 /// ```rust
671 /// # tokio_test::block_on(async {
672 /// # use linera_views::context::MemoryContext;
673 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
674 /// # use linera_views::register_view::RegisterView;
675 /// # use linera_views::views::View;
676 /// # let context = MemoryContext::new_for_testing(());
677 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
678 /// ReentrantByteCollectionView::load(context).await.unwrap();
679 /// {
680 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
681 /// }
682 /// let subviews = view.try_load_all_entries().await.unwrap();
683 /// assert_eq!(subviews.len(), 1);
684 /// # })
685 /// ```
686 pub async fn try_load_all_entries(
687 &self,
688 ) -> Result<Vec<(Vec<u8>, ReadGuardedView<W>)>, ViewError> {
689 let short_keys = self.keys().await?;
690 if !self.delete_storage_first {
691 let mut keys = Vec::new();
692 let mut short_keys_to_load = Vec::new();
693 {
694 let cached_entries = self.cached_entries.lock().unwrap();
695 for short_key in &short_keys {
696 if !self.updates.contains_key(short_key)
697 && !cached_entries.contains_key(short_key)
698 {
699 let key = self
700 .context
701 .base_key()
702 .base_tag_index(KeyTag::Subview as u8, short_key);
703 let context = self.context.clone_with_base_key(key);
704 keys.extend(W::pre_load(&context)?);
705 short_keys_to_load.push(short_key.to_vec());
706 }
707 }
708 }
709 let values = self.context.store().read_multi_values_bytes(keys).await?;
710 {
711 let mut cached_entries = self.cached_entries.lock().unwrap();
712 for (loaded_values, short_key) in values
713 .chunks_exact(W::NUM_INIT_KEYS)
714 .zip(short_keys_to_load)
715 {
716 let key = self
717 .context
718 .base_key()
719 .base_tag_index(KeyTag::Subview as u8, &short_key);
720 let context = self.context.clone_with_base_key(key);
721 let view = W::post_load(context, loaded_values)?;
722 let wrapped_view = Arc::new(RwLock::new(view));
723 cached_entries.insert(short_key.to_vec(), wrapped_view);
724 }
725 }
726 }
727 let cached_entries = self.cached_entries.lock().unwrap();
728 short_keys
729 .into_iter()
730 .map(|short_key| {
731 let view = if let Some(Update::Set(view)) = self.updates.get(&short_key) {
732 view.clone()
733 } else if let Some(view) = cached_entries.get(&short_key) {
734 view.clone()
735 } else {
736 unreachable!("All entries should have been loaded into memory");
737 };
738 let guard = ReadGuardedView(
739 view.try_read_arc()
740 .ok_or_else(|| ViewError::TryLockError(short_key.clone()))?,
741 );
742 Ok((short_key, guard))
743 })
744 .collect()
745 }
746
747 /// Loads all the entries for writing at once.
748 /// ```rust
749 /// # tokio_test::block_on(async {
750 /// # use linera_views::context::MemoryContext;
751 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
752 /// # use linera_views::register_view::RegisterView;
753 /// # use linera_views::views::View;
754 /// # let context = MemoryContext::new_for_testing(());
755 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
756 /// ReentrantByteCollectionView::load(context).await.unwrap();
757 /// {
758 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
759 /// }
760 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
761 /// assert_eq!(subviews.len(), 1);
762 /// # })
763 /// ```
764 pub async fn try_load_all_entries_mut(
765 &mut self,
766 ) -> Result<Vec<(Vec<u8>, WriteGuardedView<W>)>, ViewError> {
767 let short_keys = self.keys().await?;
768 if !self.delete_storage_first {
769 let mut keys = Vec::new();
770 let mut short_keys_to_load = Vec::new();
771 {
772 let cached_entries = self.cached_entries.get_mut().unwrap();
773 for short_key in &short_keys {
774 if !self.updates.contains_key(short_key) {
775 if let Some(view) = cached_entries.remove(short_key) {
776 self.updates.insert(short_key.to_vec(), Update::Set(view));
777 } else {
778 let key = self
779 .context
780 .base_key()
781 .base_tag_index(KeyTag::Subview as u8, short_key);
782 let context = self.context.clone_with_base_key(key);
783 keys.extend(W::pre_load(&context)?);
784 short_keys_to_load.push(short_key.to_vec());
785 }
786 }
787 }
788 }
789 let values = self.context.store().read_multi_values_bytes(keys).await?;
790 for (loaded_values, short_key) in values
791 .chunks_exact(W::NUM_INIT_KEYS)
792 .zip(short_keys_to_load)
793 {
794 let key = self
795 .context
796 .base_key()
797 .base_tag_index(KeyTag::Subview as u8, &short_key);
798 let context = self.context.clone_with_base_key(key);
799 let view = W::post_load(context, loaded_values)?;
800 let wrapped_view = Arc::new(RwLock::new(view));
801 self.updates
802 .insert(short_key.to_vec(), Update::Set(wrapped_view));
803 }
804 }
805 short_keys
806 .into_iter()
807 .map(|short_key| {
808 let Some(Update::Set(view)) = self.updates.get(&short_key) else {
809 unreachable!("All entries should have been loaded into `updates`")
810 };
811 let guard = WriteGuardedView(
812 view.clone()
813 .try_write_arc()
814 .ok_or_else(|| ViewError::TryLockError(short_key.clone()))?,
815 );
816 Ok((short_key, guard))
817 })
818 .collect()
819 }
820}
821
822impl<W: View> ReentrantByteCollectionView<W::Context, W> {
823 /// Returns the list of indices in the collection in lexicographic order.
824 /// ```rust
825 /// # tokio_test::block_on(async {
826 /// # use linera_views::context::MemoryContext;
827 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
828 /// # use linera_views::register_view::RegisterView;
829 /// # use linera_views::views::View;
830 /// # let context = MemoryContext::new_for_testing(());
831 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
832 /// ReentrantByteCollectionView::load(context).await.unwrap();
833 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
834 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
835 /// let keys = view.keys().await.unwrap();
836 /// assert_eq!(keys, vec![vec![0, 1], vec![0, 2]]);
837 /// # })
838 /// ```
839 pub async fn keys(&self) -> Result<Vec<Vec<u8>>, ViewError> {
840 let mut keys = Vec::new();
841 self.for_each_key(|key| {
842 keys.push(key.to_vec());
843 Ok(())
844 })
845 .await?;
846 Ok(keys)
847 }
848
849 /// Returns the number of indices of the collection.
850 /// ```rust
851 /// # tokio_test::block_on(async {
852 /// # use linera_views::context::MemoryContext;
853 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
854 /// # use linera_views::register_view::RegisterView;
855 /// # use linera_views::views::View;
856 /// # let context = MemoryContext::new_for_testing(());
857 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
858 /// ReentrantByteCollectionView::load(context).await.unwrap();
859 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
860 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
861 /// assert_eq!(view.count().await.unwrap(), 2);
862 /// # })
863 /// ```
864 pub async fn count(&self) -> Result<usize, ViewError> {
865 let mut count = 0;
866 self.for_each_key(|_key| {
867 count += 1;
868 Ok(())
869 })
870 .await?;
871 Ok(count)
872 }
873
874 /// Applies a function f on each index (aka key). Keys are visited in a
875 /// lexicographic order. If the function returns false then the loop
876 /// ends prematurely.
877 /// ```rust
878 /// # tokio_test::block_on(async {
879 /// # use linera_views::context::MemoryContext;
880 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
881 /// # use linera_views::register_view::RegisterView;
882 /// # use linera_views::views::View;
883 /// # let context = MemoryContext::new_for_testing(());
884 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
885 /// ReentrantByteCollectionView::load(context).await.unwrap();
886 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
887 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
888 /// let mut count = 0;
889 /// view.for_each_key_while(|_key| {
890 /// count += 1;
891 /// Ok(count < 1)
892 /// })
893 /// .await
894 /// .unwrap();
895 /// assert_eq!(count, 1);
896 /// # })
897 /// ```
898 pub async fn for_each_key_while<F>(&self, mut f: F) -> Result<(), ViewError>
899 where
900 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
901 {
902 let mut updates = self.updates.iter();
903 let mut update = updates.next();
904 if !self.delete_storage_first {
905 let base = self.get_index_key(&[]);
906 for index in self.context.store().find_keys_by_prefix(&base).await? {
907 loop {
908 match update {
909 Some((key, value)) if key <= &index => {
910 if let Update::Set(_) = value {
911 if !f(key)? {
912 return Ok(());
913 }
914 }
915 update = updates.next();
916 if key == &index {
917 break;
918 }
919 }
920 _ => {
921 if !f(&index)? {
922 return Ok(());
923 }
924 break;
925 }
926 }
927 }
928 }
929 }
930 while let Some((key, value)) = update {
931 if let Update::Set(_) = value {
932 if !f(key)? {
933 return Ok(());
934 }
935 }
936 update = updates.next();
937 }
938 Ok(())
939 }
940
941 /// Applies a function f on each index (aka key). Keys are visited in a
942 /// lexicographic order.
943 /// ```rust
944 /// # tokio_test::block_on(async {
945 /// # use linera_views::context::MemoryContext;
946 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
947 /// # use linera_views::register_view::RegisterView;
948 /// # use linera_views::views::View;
949 /// # let context = MemoryContext::new_for_testing(());
950 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
951 /// ReentrantByteCollectionView::load(context).await.unwrap();
952 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
953 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
954 /// let mut count = 0;
955 /// view.for_each_key(|_key| {
956 /// count += 1;
957 /// Ok(())
958 /// })
959 /// .await
960 /// .unwrap();
961 /// assert_eq!(count, 2);
962 /// # })
963 /// ```
964 pub async fn for_each_key<F>(&self, mut f: F) -> Result<(), ViewError>
965 where
966 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
967 {
968 self.for_each_key_while(|key| {
969 f(key)?;
970 Ok(true)
971 })
972 .await
973 }
974}
975
976impl<W: HashableView> HashableView for ReentrantByteCollectionView<W::Context, W> {
977 type Hasher = sha3::Sha3_256;
978
979 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
980 #[cfg(with_metrics)]
981 let _hash_latency = metrics::REENTRANT_COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
982 let mut hasher = sha3::Sha3_256::default();
983 let keys = self.keys().await?;
984 let count = keys.len() as u32;
985 hasher.update_with_bcs_bytes(&count)?;
986 let cached_entries = self.cached_entries.get_mut().unwrap();
987 for key in keys {
988 hasher.update_with_bytes(&key)?;
989 let hash = if let Some(entry) = self.updates.get_mut(&key) {
990 let Update::Set(view) = entry else {
991 unreachable!();
992 };
993 let mut view = view
994 .try_write_arc()
995 .ok_or_else(|| ViewError::TryLockError(key))?;
996 view.hash_mut().await?
997 } else if let Some(view) = cached_entries.get_mut(&key) {
998 let mut view = view
999 .try_write_arc()
1000 .ok_or_else(|| ViewError::TryLockError(key))?;
1001 view.hash_mut().await?
1002 } else {
1003 let key = self
1004 .context
1005 .base_key()
1006 .base_tag_index(KeyTag::Subview as u8, &key);
1007 let context = self.context.clone_with_base_key(key);
1008 let mut view = W::load(context).await?;
1009 view.hash_mut().await?
1010 };
1011 hasher.write_all(hash.as_ref())?;
1012 }
1013 Ok(hasher.finalize())
1014 }
1015
1016 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1017 #[cfg(with_metrics)]
1018 let _hash_latency = metrics::REENTRANT_COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
1019 let mut hasher = sha3::Sha3_256::default();
1020 let keys = self.keys().await?;
1021 let count = keys.len() as u32;
1022 hasher.update_with_bcs_bytes(&count)?;
1023 let mut cached_entries_result = Vec::new();
1024 {
1025 let cached_entries = self.cached_entries.lock().unwrap();
1026 for key in &keys {
1027 cached_entries_result.push(cached_entries.get(key).cloned());
1028 }
1029 }
1030 for (key, cached_entry) in keys.into_iter().zip(cached_entries_result) {
1031 hasher.update_with_bytes(&key)?;
1032 let hash = if let Some(entry) = self.updates.get(&key) {
1033 let Update::Set(view) = entry else {
1034 unreachable!();
1035 };
1036 let view = view
1037 .try_read_arc()
1038 .ok_or_else(|| ViewError::TryLockError(key))?;
1039 view.hash().await?
1040 } else if let Some(view) = cached_entry {
1041 let view = view
1042 .try_read_arc()
1043 .ok_or_else(|| ViewError::TryLockError(key))?;
1044 view.hash().await?
1045 } else {
1046 let key = self
1047 .context
1048 .base_key()
1049 .base_tag_index(KeyTag::Subview as u8, &key);
1050 let context = self.context.clone_with_base_key(key);
1051 let view = W::load(context).await?;
1052 view.hash().await?
1053 };
1054 hasher.write_all(hash.as_ref())?;
1055 }
1056 Ok(hasher.finalize())
1057 }
1058}
1059
1060/// A view that supports accessing a collection of views of the same kind, indexed by keys,
1061/// possibly several subviews at a time.
1062#[derive(Debug)]
1063pub struct ReentrantCollectionView<C, I, W> {
1064 collection: ReentrantByteCollectionView<C, W>,
1065 _phantom: PhantomData<I>,
1066}
1067
1068impl<I, W> View for ReentrantCollectionView<W::Context, I, W>
1069where
1070 W: View,
1071 I: Send + Sync + Serialize + DeserializeOwned,
1072{
1073 const NUM_INIT_KEYS: usize = ReentrantByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
1074
1075 type Context = W::Context;
1076
1077 fn context(&self) -> &Self::Context {
1078 self.collection.context()
1079 }
1080
1081 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
1082 ReentrantByteCollectionView::<W::Context, W>::pre_load(context)
1083 }
1084
1085 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
1086 let collection = ReentrantByteCollectionView::post_load(context, values)?;
1087 Ok(ReentrantCollectionView {
1088 collection,
1089 _phantom: PhantomData,
1090 })
1091 }
1092
1093 async fn load(context: Self::Context) -> Result<Self, ViewError> {
1094 Self::post_load(context, &[])
1095 }
1096
1097 fn rollback(&mut self) {
1098 self.collection.rollback()
1099 }
1100
1101 async fn has_pending_changes(&self) -> bool {
1102 self.collection.has_pending_changes().await
1103 }
1104
1105 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
1106 self.collection.flush(batch)
1107 }
1108
1109 fn clear(&mut self) {
1110 self.collection.clear()
1111 }
1112}
1113
1114impl<I, W> ClonableView for ReentrantCollectionView<W::Context, I, W>
1115where
1116 W: ClonableView,
1117 I: Send + Sync + Serialize + DeserializeOwned,
1118{
1119 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
1120 Ok(ReentrantCollectionView {
1121 collection: self.collection.clone_unchecked()?,
1122 _phantom: PhantomData,
1123 })
1124 }
1125}
1126
1127impl<I, W> ReentrantCollectionView<W::Context, I, W>
1128where
1129 W: View,
1130 I: Sync + Clone + Send + Serialize + DeserializeOwned,
1131{
1132 /// Loads a subview for the data at the given index in the collection. If an entry
1133 /// is absent then a default entry is put on the collection. The obtained view can
1134 /// then be modified.
1135 /// ```rust
1136 /// # tokio_test::block_on(async {
1137 /// # use linera_views::context::MemoryContext;
1138 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1139 /// # use linera_views::register_view::RegisterView;
1140 /// # use linera_views::views::View;
1141 /// # let context = MemoryContext::new_for_testing(());
1142 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1143 /// ReentrantCollectionView::load(context).await.unwrap();
1144 /// let subview = view.try_load_entry_mut(&23).await.unwrap();
1145 /// let value = subview.get();
1146 /// assert_eq!(*value, String::default());
1147 /// # })
1148 /// ```
1149 pub async fn try_load_entry_mut<Q>(
1150 &mut self,
1151 index: &Q,
1152 ) -> Result<WriteGuardedView<W>, ViewError>
1153 where
1154 I: Borrow<Q>,
1155 Q: Serialize + ?Sized,
1156 {
1157 let short_key = BaseKey::derive_short_key(index)?;
1158 self.collection.try_load_entry_mut(&short_key).await
1159 }
1160
1161 /// Loads a subview at the given index in the collection and gives read-only access to the data.
1162 /// If an entry is absent then `None` is returned.
1163 /// ```rust
1164 /// # tokio_test::block_on(async {
1165 /// # use linera_views::context::MemoryContext;
1166 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1167 /// # use linera_views::register_view::RegisterView;
1168 /// # use linera_views::views::View;
1169 /// # let context = MemoryContext::new_for_testing(());
1170 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1171 /// ReentrantCollectionView::load(context).await.unwrap();
1172 /// {
1173 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1174 /// }
1175 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1176 /// let value = subview.get();
1177 /// assert_eq!(*value, String::default());
1178 /// # })
1179 /// ```
1180 pub async fn try_load_entry<Q>(
1181 &self,
1182 index: &Q,
1183 ) -> Result<Option<ReadGuardedView<W>>, ViewError>
1184 where
1185 I: Borrow<Q>,
1186 Q: Serialize + ?Sized,
1187 {
1188 let short_key = BaseKey::derive_short_key(index)?;
1189 self.collection.try_load_entry(&short_key).await
1190 }
1191
1192 /// Returns `true` if the collection contains a value for the specified key.
1193 /// ```rust
1194 /// # tokio_test::block_on(async {
1195 /// # use linera_views::context::MemoryContext;
1196 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1197 /// # use linera_views::register_view::RegisterView;
1198 /// # use linera_views::views::View;
1199 /// # let context = MemoryContext::new_for_testing(());
1200 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1201 /// ReentrantCollectionView::load(context).await.unwrap();
1202 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1203 /// assert!(view.contains_key(&23).await.unwrap());
1204 /// assert!(!view.contains_key(&24).await.unwrap());
1205 /// # })
1206 /// ```
1207 pub async fn contains_key<Q>(&self, index: &Q) -> Result<bool, ViewError>
1208 where
1209 I: Borrow<Q>,
1210 Q: Serialize + ?Sized,
1211 {
1212 let short_key = BaseKey::derive_short_key(index)?;
1213 self.collection.contains_key(&short_key).await
1214 }
1215
1216 /// Marks the entry so that it is removed in the next flush.
1217 /// ```rust
1218 /// # tokio_test::block_on(async {
1219 /// # use linera_views::context::MemoryContext;
1220 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1221 /// # use linera_views::register_view::RegisterView;
1222 /// # use linera_views::views::View;
1223 /// # let context = MemoryContext::new_for_testing(());
1224 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1225 /// ReentrantCollectionView::load(context).await.unwrap();
1226 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1227 /// let value = subview.get_mut();
1228 /// assert_eq!(*value, String::default());
1229 /// view.remove_entry(&23);
1230 /// let keys = view.indices().await.unwrap();
1231 /// assert_eq!(keys.len(), 0);
1232 /// # })
1233 /// ```
1234 pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1235 where
1236 I: Borrow<Q>,
1237 Q: Serialize + ?Sized,
1238 {
1239 let short_key = BaseKey::derive_short_key(index)?;
1240 self.collection.remove_entry(short_key);
1241 Ok(())
1242 }
1243
1244 /// Marks the entry so that it is removed in the next flush.
1245 /// ```rust
1246 /// # tokio_test::block_on(async {
1247 /// # use linera_views::context::MemoryContext;
1248 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1249 /// # use linera_views::register_view::RegisterView;
1250 /// # use linera_views::views::View;
1251 /// # let context = MemoryContext::new_for_testing(());
1252 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1253 /// ReentrantCollectionView::load(context).await.unwrap();
1254 /// {
1255 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1256 /// let value = subview.get_mut();
1257 /// *value = String::from("Hello");
1258 /// }
1259 /// view.try_reset_entry_to_default(&23).unwrap();
1260 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1261 /// let value = subview.get_mut();
1262 /// assert_eq!(*value, String::default());
1263 /// # })
1264 /// ```
1265 pub fn try_reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1266 where
1267 I: Borrow<Q>,
1268 Q: Serialize + ?Sized,
1269 {
1270 let short_key = BaseKey::derive_short_key(index)?;
1271 self.collection.try_reset_entry_to_default(&short_key)
1272 }
1273
1274 /// Gets the extra data.
1275 pub fn extra(&self) -> &<W::Context as Context>::Extra {
1276 self.collection.extra()
1277 }
1278}
1279
1280impl<I, W> ReentrantCollectionView<W::Context, I, W>
1281where
1282 W: View,
1283 I: Sync + Clone + Send + Serialize + DeserializeOwned,
1284{
1285 /// Load multiple entries for writing at once.
1286 /// The entries in indices have to be all distinct.
1287 /// ```rust
1288 /// # tokio_test::block_on(async {
1289 /// # use linera_views::context::MemoryContext;
1290 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1291 /// # use linera_views::register_view::RegisterView;
1292 /// # use linera_views::views::View;
1293 /// # let context = MemoryContext::new_for_testing(());
1294 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1295 /// ReentrantCollectionView::load(context).await.unwrap();
1296 /// let indices = vec![23, 42];
1297 /// let subviews = view.try_load_entries_mut(&indices).await.unwrap();
1298 /// let value1 = subviews[0].get();
1299 /// let value2 = subviews[1].get();
1300 /// assert_eq!(*value1, String::default());
1301 /// assert_eq!(*value2, String::default());
1302 /// # })
1303 /// ```
1304 pub async fn try_load_entries_mut<'a, Q>(
1305 &'a mut self,
1306 indices: impl IntoIterator<Item = &'a Q>,
1307 ) -> Result<Vec<WriteGuardedView<W>>, ViewError>
1308 where
1309 I: Borrow<Q>,
1310 Q: Serialize + 'a,
1311 {
1312 let short_keys = indices
1313 .into_iter()
1314 .map(|index| BaseKey::derive_short_key(index))
1315 .collect::<Result<_, _>>()?;
1316 self.collection.try_load_entries_mut(short_keys).await
1317 }
1318
1319 /// Load multiple entries for reading at once.
1320 /// The entries in indices have to be all distinct.
1321 /// ```rust
1322 /// # tokio_test::block_on(async {
1323 /// # use linera_views::context::MemoryContext;
1324 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1325 /// # use linera_views::register_view::RegisterView;
1326 /// # use linera_views::views::View;
1327 /// # let context = MemoryContext::new_for_testing(());
1328 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1329 /// ReentrantCollectionView::load(context).await.unwrap();
1330 /// {
1331 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1332 /// }
1333 /// let indices = vec![23, 42];
1334 /// let subviews = view.try_load_entries(&indices).await.unwrap();
1335 /// assert!(subviews[1].is_none());
1336 /// let value0 = subviews[0].as_ref().unwrap().get();
1337 /// assert_eq!(*value0, String::default());
1338 /// # })
1339 /// ```
1340 pub async fn try_load_entries<'a, Q>(
1341 &'a self,
1342 indices: impl IntoIterator<Item = &'a Q>,
1343 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1344 where
1345 I: Borrow<Q>,
1346 Q: Serialize + 'a,
1347 {
1348 let short_keys = indices
1349 .into_iter()
1350 .map(|index| BaseKey::derive_short_key(index))
1351 .collect::<Result<_, _>>()?;
1352 self.collection.try_load_entries(short_keys).await
1353 }
1354
1355 /// Loads all entries for writing at once.
1356 /// The entries in indices have to be all distinct.
1357 /// ```rust
1358 /// # tokio_test::block_on(async {
1359 /// # use linera_views::context::MemoryContext;
1360 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1361 /// # use linera_views::register_view::RegisterView;
1362 /// # use linera_views::views::View;
1363 /// # let context = MemoryContext::new_for_testing(());
1364 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1365 /// ReentrantCollectionView::load(context).await.unwrap();
1366 /// {
1367 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1368 /// }
1369 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
1370 /// assert_eq!(subviews.len(), 1);
1371 /// # })
1372 /// ```
1373 pub async fn try_load_all_entries_mut<'a, Q>(
1374 &'a mut self,
1375 ) -> Result<Vec<(I, WriteGuardedView<W>)>, ViewError>
1376 where
1377 I: Borrow<Q>,
1378 Q: Serialize + 'a,
1379 {
1380 let results = self.collection.try_load_all_entries_mut().await?;
1381 results
1382 .into_iter()
1383 .map(|(short_key, view)| {
1384 let index = BaseKey::deserialize_value(&short_key)?;
1385 Ok((index, view))
1386 })
1387 .collect()
1388 }
1389
1390 /// Load multiple entries for reading at once.
1391 /// The entries in indices have to be all distinct.
1392 /// ```rust
1393 /// # tokio_test::block_on(async {
1394 /// # use linera_views::context::MemoryContext;
1395 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1396 /// # use linera_views::register_view::RegisterView;
1397 /// # use linera_views::views::View;
1398 /// # let context = MemoryContext::new_for_testing(());
1399 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1400 /// ReentrantCollectionView::load(context).await.unwrap();
1401 /// {
1402 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1403 /// }
1404 /// let subviews = view.try_load_all_entries().await.unwrap();
1405 /// assert_eq!(subviews.len(), 1);
1406 /// # })
1407 /// ```
1408 pub async fn try_load_all_entries<'a, Q>(
1409 &'a self,
1410 ) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError>
1411 where
1412 I: Borrow<Q>,
1413 Q: Serialize + 'a,
1414 {
1415 let results = self.collection.try_load_all_entries().await?;
1416 results
1417 .into_iter()
1418 .map(|(short_key, view)| {
1419 let index = BaseKey::deserialize_value(&short_key)?;
1420 Ok((index, view))
1421 })
1422 .collect()
1423 }
1424}
1425
1426impl<I, W> ReentrantCollectionView<W::Context, I, W>
1427where
1428 W: View,
1429 I: Sync + Clone + Send + Serialize + DeserializeOwned,
1430{
1431 /// Returns the list of indices in the collection in an order determined
1432 /// by serialization.
1433 /// ```rust
1434 /// # tokio_test::block_on(async {
1435 /// # use linera_views::context::MemoryContext;
1436 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1437 /// # use linera_views::register_view::RegisterView;
1438 /// # use linera_views::views::View;
1439 /// # let context = MemoryContext::new_for_testing(());
1440 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1441 /// ReentrantCollectionView::load(context).await.unwrap();
1442 /// view.try_load_entry_mut(&23).await.unwrap();
1443 /// view.try_load_entry_mut(&25).await.unwrap();
1444 /// let indices = view.indices().await.unwrap();
1445 /// assert_eq!(indices.len(), 2);
1446 /// # })
1447 /// ```
1448 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
1449 let mut indices = Vec::new();
1450 self.for_each_index(|index| {
1451 indices.push(index);
1452 Ok(())
1453 })
1454 .await?;
1455 Ok(indices)
1456 }
1457
1458 /// Returns the number of indices in the collection.
1459 /// ```rust
1460 /// # tokio_test::block_on(async {
1461 /// # use linera_views::context::MemoryContext;
1462 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1463 /// # use linera_views::register_view::RegisterView;
1464 /// # use linera_views::views::View;
1465 /// # let context = MemoryContext::new_for_testing(());
1466 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1467 /// ReentrantCollectionView::load(context).await.unwrap();
1468 /// view.try_load_entry_mut(&23).await.unwrap();
1469 /// view.try_load_entry_mut(&25).await.unwrap();
1470 /// assert_eq!(view.count().await.unwrap(), 2);
1471 /// # })
1472 /// ```
1473 pub async fn count(&self) -> Result<usize, ViewError> {
1474 self.collection.count().await
1475 }
1476
1477 /// Applies a function f on each index. Indices are visited in an order
1478 /// determined by the serialization. If the function f returns false then
1479 /// the loop ends prematurely.
1480 /// ```rust
1481 /// # tokio_test::block_on(async {
1482 /// # use linera_views::context::MemoryContext;
1483 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1484 /// # use linera_views::register_view::RegisterView;
1485 /// # use linera_views::views::View;
1486 /// # let context = MemoryContext::new_for_testing(());
1487 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1488 /// ReentrantCollectionView::load(context).await.unwrap();
1489 /// view.try_load_entry_mut(&23).await.unwrap();
1490 /// view.try_load_entry_mut(&24).await.unwrap();
1491 /// let mut count = 0;
1492 /// view.for_each_index_while(|_key| {
1493 /// count += 1;
1494 /// Ok(count < 1)
1495 /// })
1496 /// .await
1497 /// .unwrap();
1498 /// assert_eq!(count, 1);
1499 /// # })
1500 /// ```
1501 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
1502 where
1503 F: FnMut(I) -> Result<bool, ViewError> + Send,
1504 {
1505 self.collection
1506 .for_each_key_while(|key| {
1507 let index = BaseKey::deserialize_value(key)?;
1508 f(index)
1509 })
1510 .await?;
1511 Ok(())
1512 }
1513
1514 /// Applies a function f on each index. Indices are visited in an order
1515 /// determined by the serialization.
1516 /// ```rust
1517 /// # tokio_test::block_on(async {
1518 /// # use linera_views::context::MemoryContext;
1519 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1520 /// # use linera_views::register_view::RegisterView;
1521 /// # use linera_views::views::View;
1522 /// # let context = MemoryContext::new_for_testing(());
1523 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1524 /// ReentrantCollectionView::load(context).await.unwrap();
1525 /// view.try_load_entry_mut(&23).await.unwrap();
1526 /// view.try_load_entry_mut(&28).await.unwrap();
1527 /// let mut count = 0;
1528 /// view.for_each_index(|_key| {
1529 /// count += 1;
1530 /// Ok(())
1531 /// })
1532 /// .await
1533 /// .unwrap();
1534 /// assert_eq!(count, 2);
1535 /// # })
1536 /// ```
1537 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
1538 where
1539 F: FnMut(I) -> Result<(), ViewError> + Send,
1540 {
1541 self.collection
1542 .for_each_key(|key| {
1543 let index = BaseKey::deserialize_value(key)?;
1544 f(index)
1545 })
1546 .await?;
1547 Ok(())
1548 }
1549}
1550
1551impl<I, W> HashableView for ReentrantCollectionView<W::Context, I, W>
1552where
1553 W: HashableView,
1554 I: Send + Sync + Serialize + DeserializeOwned,
1555{
1556 type Hasher = sha3::Sha3_256;
1557
1558 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1559 self.collection.hash_mut().await
1560 }
1561
1562 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1563 self.collection.hash().await
1564 }
1565}
1566
1567/// A view that supports accessing a collection of views of the same kind, indexed by an ordered key,
1568/// possibly several subviews at a time.
1569#[derive(Debug)]
1570pub struct ReentrantCustomCollectionView<C, I, W> {
1571 collection: ReentrantByteCollectionView<C, W>,
1572 _phantom: PhantomData<I>,
1573}
1574
1575impl<I, W> View for ReentrantCustomCollectionView<W::Context, I, W>
1576where
1577 W: View,
1578 I: Send + Sync + CustomSerialize,
1579{
1580 const NUM_INIT_KEYS: usize = ReentrantByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
1581
1582 type Context = W::Context;
1583
1584 fn context(&self) -> &Self::Context {
1585 self.collection.context()
1586 }
1587
1588 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
1589 ReentrantByteCollectionView::<_, W>::pre_load(context)
1590 }
1591
1592 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
1593 let collection = ReentrantByteCollectionView::post_load(context, values)?;
1594 Ok(ReentrantCustomCollectionView {
1595 collection,
1596 _phantom: PhantomData,
1597 })
1598 }
1599
1600 async fn load(context: Self::Context) -> Result<Self, ViewError> {
1601 Self::post_load(context, &[])
1602 }
1603
1604 fn rollback(&mut self) {
1605 self.collection.rollback()
1606 }
1607
1608 async fn has_pending_changes(&self) -> bool {
1609 self.collection.has_pending_changes().await
1610 }
1611
1612 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
1613 self.collection.flush(batch)
1614 }
1615
1616 fn clear(&mut self) {
1617 self.collection.clear()
1618 }
1619}
1620
1621impl<I, W> ClonableView for ReentrantCustomCollectionView<W::Context, I, W>
1622where
1623 W: ClonableView,
1624 Self: View,
1625{
1626 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
1627 Ok(ReentrantCustomCollectionView {
1628 collection: self.collection.clone_unchecked()?,
1629 _phantom: PhantomData,
1630 })
1631 }
1632}
1633
1634impl<I, W> ReentrantCustomCollectionView<W::Context, I, W>
1635where
1636 W: View,
1637 I: Sync + Clone + Send + CustomSerialize,
1638{
1639 /// Loads a subview for the data at the given index in the collection. If an entry
1640 /// is absent then a default entry is put in the collection on this index.
1641 /// ```rust
1642 /// # tokio_test::block_on(async {
1643 /// # use linera_views::context::MemoryContext;
1644 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1645 /// # use linera_views::register_view::RegisterView;
1646 /// # use linera_views::views::View;
1647 /// # let context = MemoryContext::new_for_testing(());
1648 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1649 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1650 /// let subview = view.try_load_entry_mut(&23).await.unwrap();
1651 /// let value = subview.get();
1652 /// assert_eq!(*value, String::default());
1653 /// # })
1654 /// ```
1655 pub async fn try_load_entry_mut<Q>(
1656 &mut self,
1657 index: &Q,
1658 ) -> Result<WriteGuardedView<W>, ViewError>
1659 where
1660 I: Borrow<Q>,
1661 Q: CustomSerialize,
1662 {
1663 let short_key = index.to_custom_bytes()?;
1664 self.collection.try_load_entry_mut(&short_key).await
1665 }
1666
1667 /// Loads a subview at the given index in the collection and gives read-only access to the data.
1668 /// If an entry is absent then `None` is returned.
1669 /// ```rust
1670 /// # tokio_test::block_on(async {
1671 /// # use linera_views::context::MemoryContext;
1672 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1673 /// # use linera_views::register_view::RegisterView;
1674 /// # use linera_views::views::View;
1675 /// # let context = MemoryContext::new_for_testing(());
1676 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1677 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1678 /// {
1679 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1680 /// }
1681 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1682 /// let value = subview.get();
1683 /// assert_eq!(*value, String::default());
1684 /// # })
1685 /// ```
1686 pub async fn try_load_entry<Q>(
1687 &self,
1688 index: &Q,
1689 ) -> Result<Option<ReadGuardedView<W>>, ViewError>
1690 where
1691 I: Borrow<Q>,
1692 Q: CustomSerialize,
1693 {
1694 let short_key = index.to_custom_bytes()?;
1695 self.collection.try_load_entry(&short_key).await
1696 }
1697
1698 /// Returns `true` if the collection contains a value for the specified key.
1699 /// ```rust
1700 /// # tokio_test::block_on(async {
1701 /// # use linera_views::context::MemoryContext;
1702 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1703 /// # use linera_views::register_view::RegisterView;
1704 /// # use linera_views::views::View;
1705 /// # let context = MemoryContext::new_for_testing(());
1706 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1707 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1708 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1709 /// assert!(view.contains_key(&23).await.unwrap());
1710 /// assert!(!view.contains_key(&24).await.unwrap());
1711 /// # })
1712 /// ```
1713 pub async fn contains_key<Q>(&self, index: &Q) -> Result<bool, ViewError>
1714 where
1715 I: Borrow<Q>,
1716 Q: CustomSerialize,
1717 {
1718 let short_key = index.to_custom_bytes()?;
1719 self.collection.contains_key(&short_key).await
1720 }
1721
1722 /// Removes an entry. If absent then nothing happens.
1723 /// ```rust
1724 /// # tokio_test::block_on(async {
1725 /// # use linera_views::context::MemoryContext;
1726 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1727 /// # use linera_views::register_view::RegisterView;
1728 /// # use linera_views::views::View;
1729 /// # let context = MemoryContext::new_for_testing(());
1730 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1731 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1732 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1733 /// let value = subview.get_mut();
1734 /// assert_eq!(*value, String::default());
1735 /// view.remove_entry(&23);
1736 /// let keys = view.indices().await.unwrap();
1737 /// assert_eq!(keys.len(), 0);
1738 /// # })
1739 /// ```
1740 pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1741 where
1742 I: Borrow<Q>,
1743 Q: CustomSerialize,
1744 {
1745 let short_key = index.to_custom_bytes()?;
1746 self.collection.remove_entry(short_key);
1747 Ok(())
1748 }
1749
1750 /// Marks the entry so that it is removed in the next flush.
1751 /// ```rust
1752 /// # tokio_test::block_on(async {
1753 /// # use linera_views::context::MemoryContext;
1754 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1755 /// # use linera_views::register_view::RegisterView;
1756 /// # use linera_views::views::View;
1757 /// # let context = MemoryContext::new_for_testing(());
1758 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1759 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1760 /// {
1761 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1762 /// let value = subview.get_mut();
1763 /// *value = String::from("Hello");
1764 /// }
1765 /// {
1766 /// view.try_reset_entry_to_default(&23).unwrap();
1767 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1768 /// let value = subview.get();
1769 /// assert_eq!(*value, String::default());
1770 /// }
1771 /// # })
1772 /// ```
1773 pub fn try_reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1774 where
1775 I: Borrow<Q>,
1776 Q: CustomSerialize,
1777 {
1778 let short_key = index.to_custom_bytes()?;
1779 self.collection.try_reset_entry_to_default(&short_key)
1780 }
1781
1782 /// Gets the extra data.
1783 pub fn extra(&self) -> &<W::Context as Context>::Extra {
1784 self.collection.extra()
1785 }
1786}
1787
1788impl<I, W: View> ReentrantCustomCollectionView<W::Context, I, W>
1789where
1790 I: Sync + Clone + Send + CustomSerialize,
1791{
1792 /// Load multiple entries for writing at once.
1793 /// The entries in indices have to be all distinct.
1794 /// ```rust
1795 /// # tokio_test::block_on(async {
1796 /// # use linera_views::context::MemoryContext;
1797 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1798 /// # use linera_views::register_view::RegisterView;
1799 /// # use linera_views::views::View;
1800 /// # let context = MemoryContext::new_for_testing(());
1801 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1802 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1803 /// let indices = vec![23, 42];
1804 /// let subviews = view.try_load_entries_mut(indices).await.unwrap();
1805 /// let value1 = subviews[0].get();
1806 /// let value2 = subviews[1].get();
1807 /// assert_eq!(*value1, String::default());
1808 /// assert_eq!(*value2, String::default());
1809 /// # })
1810 /// ```
1811 pub async fn try_load_entries_mut<Q>(
1812 &mut self,
1813 indices: impl IntoIterator<Item = Q>,
1814 ) -> Result<Vec<WriteGuardedView<W>>, ViewError>
1815 where
1816 I: Borrow<Q>,
1817 Q: CustomSerialize,
1818 {
1819 let short_keys = indices
1820 .into_iter()
1821 .map(|index| index.to_custom_bytes())
1822 .collect::<Result<_, _>>()?;
1823 self.collection.try_load_entries_mut(short_keys).await
1824 }
1825
1826 /// Load multiple entries for reading at once.
1827 /// The entries in indices have to be all distinct.
1828 /// ```rust
1829 /// # tokio_test::block_on(async {
1830 /// # use linera_views::context::MemoryContext;
1831 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1832 /// # use linera_views::register_view::RegisterView;
1833 /// # use linera_views::views::View;
1834 /// # let context = MemoryContext::new_for_testing(());
1835 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1836 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1837 /// {
1838 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1839 /// }
1840 /// let indices = vec![23, 42];
1841 /// let subviews = view.try_load_entries(indices).await.unwrap();
1842 /// assert!(subviews[1].is_none());
1843 /// let value0 = subviews[0].as_ref().unwrap().get();
1844 /// assert_eq!(*value0, String::default());
1845 /// # })
1846 /// ```
1847 pub async fn try_load_entries<Q>(
1848 &self,
1849 indices: impl IntoIterator<Item = Q>,
1850 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1851 where
1852 I: Borrow<Q>,
1853 Q: CustomSerialize,
1854 {
1855 let short_keys = indices
1856 .into_iter()
1857 .map(|index| index.to_custom_bytes())
1858 .collect::<Result<_, _>>()?;
1859 self.collection.try_load_entries(short_keys).await
1860 }
1861
1862 /// Loads all entries for writing at once.
1863 /// The entries in indices have to be all distinct.
1864 /// ```rust
1865 /// # tokio_test::block_on(async {
1866 /// # use linera_views::context::MemoryContext;
1867 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1868 /// # use linera_views::register_view::RegisterView;
1869 /// # use linera_views::views::View;
1870 /// # let context = MemoryContext::new_for_testing(());
1871 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1872 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1873 /// {
1874 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1875 /// }
1876 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
1877 /// assert_eq!(subviews.len(), 1);
1878 /// # })
1879 /// ```
1880 pub async fn try_load_all_entries_mut<Q>(
1881 &mut self,
1882 ) -> Result<Vec<(I, WriteGuardedView<W>)>, ViewError>
1883 where
1884 I: Borrow<Q>,
1885 Q: CustomSerialize,
1886 {
1887 let results = self.collection.try_load_all_entries_mut().await?;
1888 results
1889 .into_iter()
1890 .map(|(short_key, view)| {
1891 let index = I::from_custom_bytes(&short_key)?;
1892 Ok((index, view))
1893 })
1894 .collect()
1895 }
1896
1897 /// Load multiple entries for reading at once.
1898 /// The entries in indices have to be all distinct.
1899 /// ```rust
1900 /// # tokio_test::block_on(async {
1901 /// # use linera_views::context::MemoryContext;
1902 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1903 /// # use linera_views::register_view::RegisterView;
1904 /// # use linera_views::views::View;
1905 /// # let context = MemoryContext::new_for_testing(());
1906 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1907 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1908 /// {
1909 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1910 /// }
1911 /// let subviews = view.try_load_all_entries().await.unwrap();
1912 /// assert_eq!(subviews.len(), 1);
1913 /// # })
1914 /// ```
1915 pub async fn try_load_all_entries<Q>(&self) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError>
1916 where
1917 I: Borrow<Q>,
1918 Q: CustomSerialize,
1919 {
1920 let results = self.collection.try_load_all_entries().await?;
1921 results
1922 .into_iter()
1923 .map(|(short_key, view)| {
1924 let index = I::from_custom_bytes(&short_key)?;
1925 Ok((index, view))
1926 })
1927 .collect()
1928 }
1929}
1930
1931impl<I, W> ReentrantCustomCollectionView<W::Context, I, W>
1932where
1933 W: View,
1934 I: Sync + Clone + Send + CustomSerialize,
1935{
1936 /// Returns the list of indices in the collection. The order is determined by
1937 /// the custom serialization.
1938 /// ```rust
1939 /// # tokio_test::block_on(async {
1940 /// # use linera_views::context::MemoryContext;
1941 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1942 /// # use linera_views::register_view::RegisterView;
1943 /// # use linera_views::views::View;
1944 /// # let context = MemoryContext::new_for_testing(());
1945 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1946 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1947 /// view.try_load_entry_mut(&23).await.unwrap();
1948 /// view.try_load_entry_mut(&25).await.unwrap();
1949 /// let indices = view.indices().await.unwrap();
1950 /// assert_eq!(indices, vec![23, 25]);
1951 /// # })
1952 /// ```
1953 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
1954 let mut indices = Vec::new();
1955 self.for_each_index(|index| {
1956 indices.push(index);
1957 Ok(())
1958 })
1959 .await?;
1960 Ok(indices)
1961 }
1962
1963 /// Returns the number of entries in the collection.
1964 /// ```rust
1965 /// # tokio_test::block_on(async {
1966 /// # use linera_views::context::MemoryContext;
1967 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1968 /// # use linera_views::register_view::RegisterView;
1969 /// # use linera_views::views::View;
1970 /// # let context = MemoryContext::new_for_testing(());
1971 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1972 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1973 /// view.try_load_entry_mut(&23).await.unwrap();
1974 /// view.try_load_entry_mut(&25).await.unwrap();
1975 /// assert_eq!(view.count().await.unwrap(), 2);
1976 /// # })
1977 /// ```
1978 pub async fn count(&self) -> Result<usize, ViewError> {
1979 self.collection.count().await
1980 }
1981
1982 /// Applies a function f on each index. Indices are visited in an order
1983 /// determined by the custom serialization. If the function f returns false
1984 /// then the loop ends prematurely.
1985 /// ```rust
1986 /// # tokio_test::block_on(async {
1987 /// # use linera_views::context::MemoryContext;
1988 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1989 /// # use linera_views::register_view::RegisterView;
1990 /// # use linera_views::views::View;
1991 /// # let context = MemoryContext::new_for_testing(());
1992 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1993 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1994 /// view.try_load_entry_mut(&28).await.unwrap();
1995 /// view.try_load_entry_mut(&24).await.unwrap();
1996 /// view.try_load_entry_mut(&23).await.unwrap();
1997 /// let mut part_indices = Vec::new();
1998 /// view.for_each_index_while(|index| {
1999 /// part_indices.push(index);
2000 /// Ok(part_indices.len() < 2)
2001 /// })
2002 /// .await
2003 /// .unwrap();
2004 /// assert_eq!(part_indices, vec![23, 24]);
2005 /// # })
2006 /// ```
2007 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
2008 where
2009 F: FnMut(I) -> Result<bool, ViewError> + Send,
2010 {
2011 self.collection
2012 .for_each_key_while(|key| {
2013 let index = I::from_custom_bytes(key)?;
2014 f(index)
2015 })
2016 .await?;
2017 Ok(())
2018 }
2019
2020 /// Applies a function f on each index. Indices are visited in an order
2021 /// determined by the custom serialization.
2022 /// ```rust
2023 /// # tokio_test::block_on(async {
2024 /// # use linera_views::context::MemoryContext;
2025 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2026 /// # use linera_views::register_view::RegisterView;
2027 /// # use linera_views::views::View;
2028 /// # let context = MemoryContext::new_for_testing(());
2029 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2030 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2031 /// view.try_load_entry_mut(&28).await.unwrap();
2032 /// view.try_load_entry_mut(&24).await.unwrap();
2033 /// view.try_load_entry_mut(&23).await.unwrap();
2034 /// let mut indices = Vec::new();
2035 /// view.for_each_index(|index| {
2036 /// indices.push(index);
2037 /// Ok(())
2038 /// })
2039 /// .await
2040 /// .unwrap();
2041 /// assert_eq!(indices, vec![23, 24, 28]);
2042 /// # })
2043 /// ```
2044 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
2045 where
2046 F: FnMut(I) -> Result<(), ViewError> + Send,
2047 {
2048 self.collection
2049 .for_each_key(|key| {
2050 let index = I::from_custom_bytes(key)?;
2051 f(index)
2052 })
2053 .await?;
2054 Ok(())
2055 }
2056}
2057
2058impl<I, W> HashableView for ReentrantCustomCollectionView<W::Context, I, W>
2059where
2060 W: HashableView,
2061 I: Send + Sync + CustomSerialize,
2062{
2063 type Hasher = sha3::Sha3_256;
2064
2065 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
2066 self.collection.hash_mut().await
2067 }
2068
2069 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
2070 self.collection.hash().await
2071 }
2072}
2073
2074/// Type wrapping `ReentrantByteCollectionView` while memoizing the hash.
2075pub type HashedReentrantByteCollectionView<C, W> =
2076 WrappedHashableContainerView<C, ReentrantByteCollectionView<C, W>, HasherOutput>;
2077
2078/// Type wrapping `ReentrantCollectionView` while memoizing the hash.
2079pub type HashedReentrantCollectionView<C, I, W> =
2080 WrappedHashableContainerView<C, ReentrantCollectionView<C, I, W>, HasherOutput>;
2081
2082/// Type wrapping `ReentrantCustomCollectionView` while memoizing the hash.
2083pub type HashedReentrantCustomCollectionView<C, I, W> =
2084 WrappedHashableContainerView<C, ReentrantCustomCollectionView<C, I, W>, HasherOutput>;
2085
2086#[cfg(with_graphql)]
2087mod graphql {
2088 use std::borrow::Cow;
2089
2090 use super::{ReadGuardedView, ReentrantCollectionView};
2091 use crate::{
2092 graphql::{hash_name, mangle, missing_key_error, Entry, MapFilters, MapInput},
2093 views::View,
2094 };
2095
2096 impl<T: async_graphql::OutputType> async_graphql::OutputType for ReadGuardedView<T> {
2097 fn type_name() -> Cow<'static, str> {
2098 T::type_name()
2099 }
2100
2101 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
2102 T::create_type_info(registry)
2103 }
2104
2105 async fn resolve(
2106 &self,
2107 ctx: &async_graphql::ContextSelectionSet<'_>,
2108 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
2109 ) -> async_graphql::ServerResult<async_graphql::Value> {
2110 (**self).resolve(ctx, field).await
2111 }
2112 }
2113
2114 impl<C: Send + Sync, K: async_graphql::OutputType, V: async_graphql::OutputType>
2115 async_graphql::TypeName for ReentrantCollectionView<C, K, V>
2116 {
2117 fn type_name() -> Cow<'static, str> {
2118 format!(
2119 "ReentrantCollectionView_{}_{}_{}",
2120 mangle(K::type_name()),
2121 mangle(V::type_name()),
2122 hash_name::<(K, V)>(),
2123 )
2124 .into()
2125 }
2126 }
2127
2128 #[async_graphql::Object(cache_control(no_cache), name_type)]
2129 impl<K, V> ReentrantCollectionView<V::Context, K, V>
2130 where
2131 K: async_graphql::InputType
2132 + async_graphql::OutputType
2133 + serde::ser::Serialize
2134 + serde::de::DeserializeOwned
2135 + std::fmt::Debug
2136 + Clone,
2137 V: View + async_graphql::OutputType,
2138 MapInput<K>: async_graphql::InputType,
2139 MapFilters<K>: async_graphql::InputType,
2140 {
2141 async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
2142 Ok(self.indices().await?)
2143 }
2144
2145 async fn entry(
2146 &self,
2147 key: K,
2148 ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
2149 let value = self
2150 .try_load_entry(&key)
2151 .await?
2152 .ok_or_else(|| missing_key_error(&key))?;
2153 Ok(Entry { value, key })
2154 }
2155
2156 async fn entries(
2157 &self,
2158 input: Option<MapInput<K>>,
2159 ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
2160 let keys = if let Some(keys) = input
2161 .and_then(|input| input.filters)
2162 .and_then(|filters| filters.keys)
2163 {
2164 keys
2165 } else {
2166 self.indices().await?
2167 };
2168
2169 let mut values = vec![];
2170 for key in keys {
2171 let value = self
2172 .try_load_entry(&key)
2173 .await?
2174 .ok_or_else(|| missing_key_error(&key))?;
2175 values.push(Entry { value, key })
2176 }
2177
2178 Ok(values)
2179 }
2180 }
2181
2182 use crate::reentrant_collection_view::ReentrantCustomCollectionView;
2183 impl<C: Send + Sync, K: async_graphql::OutputType, V: async_graphql::OutputType>
2184 async_graphql::TypeName for ReentrantCustomCollectionView<C, K, V>
2185 {
2186 fn type_name() -> Cow<'static, str> {
2187 format!(
2188 "ReentrantCustomCollectionView_{}_{}_{:08x}",
2189 mangle(K::type_name()),
2190 mangle(V::type_name()),
2191 hash_name::<(K, V)>(),
2192 )
2193 .into()
2194 }
2195 }
2196
2197 #[async_graphql::Object(cache_control(no_cache), name_type)]
2198 impl<K, V> ReentrantCustomCollectionView<V::Context, K, V>
2199 where
2200 K: async_graphql::InputType
2201 + async_graphql::OutputType
2202 + crate::common::CustomSerialize
2203 + std::fmt::Debug
2204 + Clone,
2205 V: View + async_graphql::OutputType,
2206 MapInput<K>: async_graphql::InputType,
2207 MapFilters<K>: async_graphql::InputType,
2208 {
2209 async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
2210 Ok(self.indices().await?)
2211 }
2212
2213 async fn entry(
2214 &self,
2215 key: K,
2216 ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
2217 let value = self
2218 .try_load_entry(&key)
2219 .await?
2220 .ok_or_else(|| missing_key_error(&key))?;
2221 Ok(Entry { value, key })
2222 }
2223
2224 async fn entries(
2225 &self,
2226 input: Option<MapInput<K>>,
2227 ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
2228 let keys = if let Some(keys) = input
2229 .and_then(|input| input.filters)
2230 .and_then(|filters| filters.keys)
2231 {
2232 keys
2233 } else {
2234 self.indices().await?
2235 };
2236
2237 let mut values = vec![];
2238 for key in keys {
2239 let value = self
2240 .try_load_entry(&key)
2241 .await?
2242 .ok_or_else(|| missing_key_error(&key))?;
2243 values.push(Entry { value, key })
2244 }
2245
2246 Ok(values)
2247 }
2248 }
2249}