linera_views/views/reentrant_collection_view.rs
1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5 borrow::Borrow,
6 collections::{btree_map, BTreeMap},
7 io::Write,
8 marker::PhantomData,
9 mem,
10 sync::Arc,
11};
12
13use async_lock::{RwLock, RwLockReadGuardArc, RwLockWriteGuardArc};
14#[cfg(with_metrics)]
15use linera_base::prometheus_util::MeasureLatency as _;
16use serde::{de::DeserializeOwned, Serialize};
17
18use crate::{
19 batch::Batch,
20 common::{CustomSerialize, HasherOutput, SliceExt as _, Update},
21 context::{BaseKey, Context},
22 hashable_wrapper::WrappedHashableContainerView,
23 historical_hash_wrapper::HistoricallyHashableView,
24 store::ReadableKeyValueStore as _,
25 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
26};
27
28#[cfg(with_metrics)]
29mod metrics {
30 use std::sync::LazyLock;
31
32 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
33 use prometheus::HistogramVec;
34
35 /// The runtime of hash computation
36 pub static REENTRANT_COLLECTION_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> =
37 LazyLock::new(|| {
38 register_histogram_vec(
39 "reentrant_collection_view_hash_runtime",
40 "ReentrantCollectionView hash runtime",
41 &[],
42 exponential_bucket_latencies(5.0),
43 )
44 });
45}
46
47/// A read-only accessor for a particular subview in a [`ReentrantCollectionView`].
48#[derive(Debug)]
49pub struct ReadGuardedView<T>(RwLockReadGuardArc<T>);
50
51impl<T> std::ops::Deref for ReadGuardedView<T> {
52 type Target = T;
53 fn deref(&self) -> &T {
54 self.0.deref()
55 }
56}
57
58/// A read-write accessor for a particular subview in a [`ReentrantCollectionView`].
59#[derive(Debug)]
60pub struct WriteGuardedView<T>(RwLockWriteGuardArc<T>);
61
62impl<T> std::ops::Deref for WriteGuardedView<T> {
63 type Target = T;
64 fn deref(&self) -> &T {
65 self.0.deref()
66 }
67}
68
69impl<T> std::ops::DerefMut for WriteGuardedView<T> {
70 fn deref_mut(&mut self) -> &mut T {
71 self.0.deref_mut()
72 }
73}
74
75/// A view that supports accessing a collection of views of the same kind, indexed by `Vec<u8>`,
76/// possibly several subviews at a time.
77#[derive(Debug)]
78pub struct ReentrantByteCollectionView<C, W> {
79 /// The view [`Context`].
80 context: C,
81 /// If the current persisted data will be completely erased and replaced on the next flush.
82 delete_storage_first: bool,
83 /// Entries that may have staged changes.
84 updates: BTreeMap<Vec<u8>, Update<Arc<RwLock<W>>>>,
85}
86
87impl<W, C2> ReplaceContext<C2> for ReentrantByteCollectionView<W::Context, W>
88where
89 W: View + ReplaceContext<C2>,
90 C2: Context,
91{
92 type Target = ReentrantByteCollectionView<C2, <W as ReplaceContext<C2>>::Target>;
93
94 async fn with_context(
95 &mut self,
96 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
97 ) -> Self::Target {
98 let mut updates: BTreeMap<_, Update<Arc<RwLock<W::Target>>>> = BTreeMap::new();
99 for (key, update) in &self.updates {
100 let new_value = match update {
101 Update::Removed => Update::Removed,
102 Update::Set(x) => Update::Set(Arc::new(RwLock::new(
103 x.write().await.with_context(ctx.clone()).await,
104 ))),
105 };
106 updates.insert(key.clone(), new_value);
107 }
108 ReentrantByteCollectionView {
109 context: ctx(self.context()),
110 delete_storage_first: self.delete_storage_first,
111 updates,
112 }
113 }
114}
115
116/// We need to find new base keys in order to implement the collection view.
117/// We do this by appending a value to the base key.
118///
119/// Sub-views in a collection share a common key prefix, like in other view types. However,
120/// just concatenating the shared prefix with sub-view keys makes it impossible to distinguish if a
121/// given key belongs to a child sub-view or a grandchild sub-view (consider for example if a
122/// collection is stored inside the collection).
123#[repr(u8)]
124enum KeyTag {
125 /// Prefix for specifying an index and serves to indicate the existence of an entry in the collection.
126 Index = MIN_VIEW_TAG,
127 /// Prefix for specifying as the prefix for the sub-view.
128 Subview,
129}
130
131impl<W: View> View for ReentrantByteCollectionView<W::Context, W> {
132 const NUM_INIT_KEYS: usize = 0;
133
134 type Context = W::Context;
135
136 fn context(&self) -> &Self::Context {
137 &self.context
138 }
139
140 fn pre_load(_context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
141 Ok(Vec::new())
142 }
143
144 fn post_load(context: Self::Context, _values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
145 Ok(Self {
146 context,
147 delete_storage_first: false,
148 updates: BTreeMap::new(),
149 })
150 }
151
152 fn rollback(&mut self) {
153 self.delete_storage_first = false;
154 self.updates.clear();
155 }
156
157 async fn has_pending_changes(&self) -> bool {
158 if self.delete_storage_first {
159 return true;
160 }
161 !self.updates.is_empty()
162 }
163
164 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
165 let mut delete_view = false;
166 if self.delete_storage_first {
167 delete_view = true;
168 batch.delete_key_prefix(self.context.base_key().bytes.clone());
169 for (index, update) in mem::take(&mut self.updates) {
170 if let Update::Set(view) = update {
171 let mut view = Arc::try_unwrap(view)
172 .map_err(|_| ViewError::TryLockError(index.clone()))?
173 .into_inner();
174 view.flush(batch)?;
175 self.add_index(batch, &index);
176 delete_view = false;
177 }
178 }
179 } else {
180 for (index, update) in mem::take(&mut self.updates) {
181 match update {
182 Update::Set(view) => {
183 let mut view = Arc::try_unwrap(view)
184 .map_err(|_| ViewError::TryLockError(index.clone()))?
185 .into_inner();
186 view.flush(batch)?;
187 self.add_index(batch, &index);
188 }
189 Update::Removed => {
190 let key_subview = self.get_subview_key(&index);
191 let key_index = self.get_index_key(&index);
192 batch.delete_key(key_index);
193 batch.delete_key_prefix(key_subview);
194 }
195 }
196 }
197 }
198 self.delete_storage_first = false;
199 Ok(delete_view)
200 }
201
202 fn clear(&mut self) {
203 self.delete_storage_first = true;
204 self.updates.clear();
205 }
206}
207
208impl<W: ClonableView> ClonableView for ReentrantByteCollectionView<W::Context, W> {
209 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
210 let cloned_updates = self
211 .updates
212 .iter()
213 .map(|(key, value)| {
214 let cloned_value = match value {
215 Update::Removed => Update::Removed,
216 Update::Set(view_lock) => {
217 let mut view = view_lock
218 .try_write()
219 .ok_or_else(|| ViewError::TryLockError(key.clone()))?;
220 Update::Set(Arc::new(RwLock::new(view.clone_unchecked()?)))
221 }
222 };
223 Ok::<_, ViewError>((key.clone(), cloned_value))
224 })
225 .collect::<Result<_, _>>()?;
226
227 Ok(ReentrantByteCollectionView {
228 context: self.context.clone(),
229 delete_storage_first: self.delete_storage_first,
230 updates: cloned_updates,
231 })
232 }
233}
234
235impl<C: Context, W> ReentrantByteCollectionView<C, W> {
236 fn get_index_key(&self, index: &[u8]) -> Vec<u8> {
237 self.context
238 .base_key()
239 .base_tag_index(KeyTag::Index as u8, index)
240 }
241
242 fn get_subview_key(&self, index: &[u8]) -> Vec<u8> {
243 self.context
244 .base_key()
245 .base_tag_index(KeyTag::Subview as u8, index)
246 }
247
248 fn add_index(&self, batch: &mut Batch, index: &[u8]) {
249 let key = self.get_index_key(index);
250 batch.put_key_value_bytes(key, vec![]);
251 }
252}
253
254impl<W: View> ReentrantByteCollectionView<W::Context, W> {
255 /// Reads the view and if missing returns the default view
256 async fn wrapped_view(
257 context: &W::Context,
258 delete_storage_first: bool,
259 short_key: &[u8],
260 ) -> Result<Arc<RwLock<W>>, ViewError> {
261 let key = context
262 .base_key()
263 .base_tag_index(KeyTag::Subview as u8, short_key);
264 let context = context.clone_with_base_key(key);
265 // Obtain a view and set its pending state to the default (e.g. empty) state
266 let view = if delete_storage_first {
267 W::new(context)?
268 } else {
269 W::load(context).await?
270 };
271 Ok(Arc::new(RwLock::new(view)))
272 }
273
274 /// Load the view and insert it into the updates if needed.
275 /// If the entry is missing, then it is set to default.
276 async fn try_load_view_mut(&mut self, short_key: &[u8]) -> Result<Arc<RwLock<W>>, ViewError> {
277 use btree_map::Entry::*;
278 Ok(match self.updates.entry(short_key.to_owned()) {
279 Occupied(mut entry) => match entry.get_mut() {
280 Update::Set(view) => view.clone(),
281 entry @ Update::Removed => {
282 let wrapped_view = Self::wrapped_view(&self.context, true, short_key).await?;
283 *entry = Update::Set(wrapped_view.clone());
284 wrapped_view
285 }
286 },
287 Vacant(entry) => {
288 let wrapped_view =
289 Self::wrapped_view(&self.context, self.delete_storage_first, short_key).await?;
290 entry.insert(Update::Set(wrapped_view.clone()));
291 wrapped_view
292 }
293 })
294 }
295
296 /// Load the view from the update is available.
297 /// If missing, then the entry is loaded from storage and if
298 /// missing there an error is reported.
299 async fn try_load_view(&self, short_key: &[u8]) -> Result<Option<Arc<RwLock<W>>>, ViewError> {
300 Ok(if let Some(entry) = self.updates.get(short_key) {
301 match entry {
302 Update::Set(view) => Some(view.clone()),
303 _entry @ Update::Removed => None,
304 }
305 } else if self.delete_storage_first {
306 None
307 } else {
308 let key_index = self
309 .context
310 .base_key()
311 .base_tag_index(KeyTag::Index as u8, short_key);
312 if self.context.store().contains_key(&key_index).await? {
313 let view = Self::wrapped_view(&self.context, false, short_key).await?;
314 Some(view)
315 } else {
316 None
317 }
318 })
319 }
320
321 /// Loads a subview for the data at the given index in the collection. If an entry
322 /// is absent then a default entry is added to the collection. The resulting view
323 /// can be modified.
324 /// ```rust
325 /// # tokio_test::block_on(async {
326 /// # use linera_views::context::MemoryContext;
327 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
328 /// # use linera_views::register_view::RegisterView;
329 /// # use linera_views::views::View;
330 /// # let context = MemoryContext::new_for_testing(());
331 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
332 /// ReentrantByteCollectionView::load(context).await.unwrap();
333 /// let subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
334 /// let value = subview.get();
335 /// assert_eq!(*value, String::default());
336 /// # })
337 /// ```
338 pub async fn try_load_entry_mut(
339 &mut self,
340 short_key: &[u8],
341 ) -> Result<WriteGuardedView<W>, ViewError> {
342 Ok(WriteGuardedView(
343 self.try_load_view_mut(short_key)
344 .await?
345 .try_write_arc()
346 .ok_or_else(|| ViewError::TryLockError(short_key.to_vec()))?,
347 ))
348 }
349
350 /// Loads a subview at the given index in the collection and gives read-only access to the data.
351 /// If an entry is absent then `None` is returned.
352 /// ```rust
353 /// # tokio_test::block_on(async {
354 /// # use linera_views::context::MemoryContext;
355 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
356 /// # use linera_views::register_view::RegisterView;
357 /// # use linera_views::views::View;
358 /// # let context = MemoryContext::new_for_testing(());
359 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
360 /// ReentrantByteCollectionView::load(context).await.unwrap();
361 /// {
362 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
363 /// }
364 /// let subview = view.try_load_entry(&[0, 1]).await.unwrap().unwrap();
365 /// let value = subview.get();
366 /// assert_eq!(*value, String::default());
367 /// # })
368 /// ```
369 pub async fn try_load_entry(
370 &self,
371 short_key: &[u8],
372 ) -> Result<Option<ReadGuardedView<W>>, ViewError> {
373 match self.try_load_view(short_key).await? {
374 None => Ok(None),
375 Some(view) => Ok(Some(ReadGuardedView(
376 view.try_read_arc()
377 .ok_or_else(|| ViewError::TryLockError(short_key.to_vec()))?,
378 ))),
379 }
380 }
381
382 /// Returns `true` if the collection contains a value for the specified key.
383 /// ```rust
384 /// # tokio_test::block_on(async {
385 /// # use linera_views::context::MemoryContext;
386 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
387 /// # use linera_views::register_view::RegisterView;
388 /// # use linera_views::views::View;
389 /// # let context = MemoryContext::new_for_testing(());
390 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
391 /// ReentrantByteCollectionView::load(context).await.unwrap();
392 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
393 /// assert!(view.contains_key(&[0, 1]).await.unwrap());
394 /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
395 /// # })
396 /// ```
397 pub async fn contains_key(&self, short_key: &[u8]) -> Result<bool, ViewError> {
398 Ok(if let Some(entry) = self.updates.get(short_key) {
399 match entry {
400 Update::Set(_view) => true,
401 Update::Removed => false,
402 }
403 } else if self.delete_storage_first {
404 false
405 } else {
406 let key_index = self
407 .context
408 .base_key()
409 .base_tag_index(KeyTag::Index as u8, short_key);
410 self.context.store().contains_key(&key_index).await?
411 })
412 }
413
414 /// Removes an entry. If absent then nothing happens.
415 /// ```rust
416 /// # tokio_test::block_on(async {
417 /// # use linera_views::context::MemoryContext;
418 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
419 /// # use linera_views::register_view::RegisterView;
420 /// # use linera_views::views::View;
421 /// # let context = MemoryContext::new_for_testing(());
422 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
423 /// ReentrantByteCollectionView::load(context).await.unwrap();
424 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
425 /// let value = subview.get_mut();
426 /// assert_eq!(*value, String::default());
427 /// view.remove_entry(vec![0, 1]);
428 /// let keys = view.keys().await.unwrap();
429 /// assert_eq!(keys.len(), 0);
430 /// # })
431 /// ```
432 pub fn remove_entry(&mut self, short_key: Vec<u8>) {
433 if self.delete_storage_first {
434 // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
435 self.updates.remove(&short_key);
436 } else {
437 self.updates.insert(short_key, Update::Removed);
438 }
439 }
440
441 /// Marks the entry so that it is removed in the next flush.
442 /// ```rust
443 /// # tokio_test::block_on(async {
444 /// # use linera_views::context::MemoryContext;
445 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
446 /// # use linera_views::register_view::RegisterView;
447 /// # use linera_views::views::View;
448 /// # let context = MemoryContext::new_for_testing(());
449 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
450 /// ReentrantByteCollectionView::load(context).await.unwrap();
451 /// {
452 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
453 /// let value = subview.get_mut();
454 /// *value = String::from("Hello");
455 /// }
456 /// view.try_reset_entry_to_default(&[0, 1]).unwrap();
457 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
458 /// let value = subview.get_mut();
459 /// assert_eq!(*value, String::default());
460 /// # })
461 /// ```
462 pub fn try_reset_entry_to_default(&mut self, short_key: &[u8]) -> Result<(), ViewError> {
463 let key = self
464 .context
465 .base_key()
466 .base_tag_index(KeyTag::Subview as u8, short_key);
467 let context = self.context.clone_with_base_key(key);
468 let view = W::new(context)?;
469 let view = Arc::new(RwLock::new(view));
470 let view = Update::Set(view);
471 self.updates.insert(short_key.to_vec(), view);
472 Ok(())
473 }
474
475 /// Gets the extra data.
476 pub fn extra(&self) -> &<W::Context as Context>::Extra {
477 self.context.extra()
478 }
479}
480
481impl<W: View> ReentrantByteCollectionView<W::Context, W> {
482 /// Loads multiple entries for writing at once.
483 /// The entries in `short_keys` have to be all distinct.
484 /// ```rust
485 /// # tokio_test::block_on(async {
486 /// # use linera_views::context::MemoryContext;
487 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
488 /// # use linera_views::register_view::RegisterView;
489 /// # use linera_views::views::View;
490 /// # let context = MemoryContext::new_for_testing(());
491 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
492 /// ReentrantByteCollectionView::load(context).await.unwrap();
493 /// {
494 /// let mut subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
495 /// *subview.get_mut() = "Bonjour".to_string();
496 /// }
497 /// let short_keys = vec![vec![0, 1], vec![2, 3]];
498 /// let subviews = view.try_load_entries_mut(short_keys).await.unwrap();
499 /// let value1 = subviews[0].get();
500 /// let value2 = subviews[1].get();
501 /// assert_eq!(*value1, "Bonjour".to_string());
502 /// assert_eq!(*value2, String::default());
503 /// # })
504 /// ```
505 pub async fn try_load_entries_mut(
506 &mut self,
507 short_keys: Vec<Vec<u8>>,
508 ) -> Result<Vec<WriteGuardedView<W>>, ViewError> {
509 let mut short_keys_to_load = Vec::new();
510 let mut keys = Vec::new();
511 for short_key in &short_keys {
512 let key = self
513 .context
514 .base_key()
515 .base_tag_index(KeyTag::Subview as u8, short_key);
516 let context = self.context.clone_with_base_key(key);
517 match self.updates.entry(short_key.to_vec()) {
518 btree_map::Entry::Occupied(mut entry) => {
519 if let Update::Removed = entry.get() {
520 let view = W::new(context)?;
521 let view = Arc::new(RwLock::new(view));
522 entry.insert(Update::Set(view));
523 }
524 }
525 btree_map::Entry::Vacant(entry) => {
526 if self.delete_storage_first {
527 let view = W::new(context)?;
528 let view = Arc::new(RwLock::new(view));
529 entry.insert(Update::Set(view));
530 } else {
531 keys.extend(W::pre_load(&context)?);
532 short_keys_to_load.push(short_key.to_vec());
533 }
534 }
535 }
536 }
537 let values = self.context.store().read_multi_values_bytes(keys).await?;
538 for (loaded_values, short_key) in values
539 .chunks_exact_or_repeat(W::NUM_INIT_KEYS)
540 .zip(short_keys_to_load)
541 {
542 let key = self
543 .context
544 .base_key()
545 .base_tag_index(KeyTag::Subview as u8, &short_key);
546 let context = self.context.clone_with_base_key(key);
547 let view = W::post_load(context, loaded_values)?;
548 let wrapped_view = Arc::new(RwLock::new(view));
549 self.updates
550 .insert(short_key.to_vec(), Update::Set(wrapped_view));
551 }
552
553 short_keys
554 .into_iter()
555 .map(|short_key| {
556 let Some(Update::Set(view)) = self.updates.get(&short_key) else {
557 unreachable!()
558 };
559 Ok(WriteGuardedView(
560 view.clone()
561 .try_write_arc()
562 .ok_or_else(|| ViewError::TryLockError(short_key))?,
563 ))
564 })
565 .collect()
566 }
567
568 /// Loads multiple entries for writing at once with their keys.
569 /// The entries in short_keys have to be all distinct.
570 /// ```rust
571 /// # tokio_test::block_on(async {
572 /// # use linera_views::context::MemoryContext;
573 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
574 /// # use linera_views::register_view::RegisterView;
575 /// # use linera_views::views::View;
576 /// # let context = MemoryContext::new_for_testing(());
577 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
578 /// ReentrantByteCollectionView::load(context).await.unwrap();
579 /// {
580 /// let mut subview = view.try_load_entry_mut(&vec![0, 1]).await.unwrap();
581 /// *subview.get_mut() = "Bonjour".to_string();
582 /// }
583 /// let short_keys = vec![vec![0, 1], vec![2, 3]];
584 /// let subviews = view.try_load_entries_pairs_mut(short_keys).await.unwrap();
585 /// let value1 = subviews[0].1.get();
586 /// let value2 = subviews[1].1.get();
587 /// assert_eq!(*value1, "Bonjour".to_string());
588 /// assert_eq!(*value2, String::default());
589 /// # })
590 /// ```
591 pub async fn try_load_entries_pairs_mut(
592 &mut self,
593 short_keys: Vec<Vec<u8>>,
594 ) -> Result<Vec<(Vec<u8>, WriteGuardedView<W>)>, ViewError> {
595 let values = self.try_load_entries_mut(short_keys.clone()).await?;
596 Ok(short_keys.into_iter().zip(values).collect())
597 }
598
599 /// Loads multiple entries for reading at once.
600 /// The entries in `short_keys` have to be all distinct.
601 /// ```rust
602 /// # tokio_test::block_on(async {
603 /// # use linera_views::context::MemoryContext;
604 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
605 /// # use linera_views::register_view::RegisterView;
606 /// # use linera_views::views::View;
607 /// # let context = MemoryContext::new_for_testing(());
608 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
609 /// ReentrantByteCollectionView::load(context).await.unwrap();
610 /// {
611 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
612 /// }
613 /// let short_keys = vec![vec![0, 1], vec![2, 3]];
614 /// let subviews = view.try_load_entries(short_keys).await.unwrap();
615 /// assert!(subviews[1].is_none());
616 /// let value0 = subviews[0].as_ref().unwrap().get();
617 /// assert_eq!(*value0, String::default());
618 /// # })
619 /// ```
620 pub async fn try_load_entries(
621 &self,
622 short_keys: Vec<Vec<u8>>,
623 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError> {
624 let mut results = vec![None; short_keys.len()];
625 let mut keys_to_check = Vec::new();
626 let mut keys_to_check_metadata = Vec::new();
627
628 for (position, short_key) in short_keys.into_iter().enumerate() {
629 if let Some(update) = self.updates.get(&short_key) {
630 if let Update::Set(view) = update {
631 results[position] = Some((short_key, view.clone()));
632 }
633 } else if !self.delete_storage_first {
634 let key_index = self
635 .context
636 .base_key()
637 .base_tag_index(KeyTag::Index as u8, &short_key);
638 keys_to_check.push(key_index);
639 keys_to_check_metadata.push((position, short_key));
640 }
641 }
642
643 let found_keys = self.context.store().contains_keys(keys_to_check).await?;
644 let entries_to_load = keys_to_check_metadata
645 .into_iter()
646 .zip(found_keys)
647 .filter_map(|(metadata, found)| found.then_some(metadata))
648 .map(|(position, short_key)| {
649 let subview_key = self
650 .context
651 .base_key()
652 .base_tag_index(KeyTag::Subview as u8, &short_key);
653 let subview_context = self.context.clone_with_base_key(subview_key);
654 (position, short_key.to_owned(), subview_context)
655 })
656 .collect::<Vec<_>>();
657 if !entries_to_load.is_empty() {
658 let mut keys_to_load = Vec::with_capacity(entries_to_load.len() * W::NUM_INIT_KEYS);
659 for (_, _, context) in &entries_to_load {
660 keys_to_load.extend(W::pre_load(context)?);
661 }
662 let values = self
663 .context
664 .store()
665 .read_multi_values_bytes(keys_to_load)
666 .await?;
667 for (loaded_values, (position, short_key, context)) in values
668 .chunks_exact_or_repeat(W::NUM_INIT_KEYS)
669 .zip(entries_to_load)
670 {
671 let view = W::post_load(context, loaded_values)?;
672 let wrapped_view = Arc::new(RwLock::new(view));
673 results[position] = Some((short_key, wrapped_view));
674 }
675 }
676
677 results
678 .into_iter()
679 .map(|maybe_view| match maybe_view {
680 Some((short_key, view)) => Ok(Some(ReadGuardedView(
681 view.try_read_arc()
682 .ok_or_else(|| ViewError::TryLockError(short_key))?,
683 ))),
684 None => Ok(None),
685 })
686 .collect()
687 }
688
689 /// Loads multiple entries for reading at once with their keys.
690 /// The entries in short_keys have to be all distinct.
691 /// ```rust
692 /// # tokio_test::block_on(async {
693 /// # use linera_views::context::MemoryContext;
694 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
695 /// # use linera_views::register_view::RegisterView;
696 /// # use linera_views::views::View;
697 /// # let context = MemoryContext::new_for_testing(());
698 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
699 /// ReentrantByteCollectionView::load(context).await.unwrap();
700 /// {
701 /// let _subview = view.try_load_entry_mut(&vec![0, 1]).await.unwrap();
702 /// }
703 /// let short_keys = vec![vec![0, 1], vec![0, 2]];
704 /// let subviews = view.try_load_entries_pairs(short_keys).await.unwrap();
705 /// assert!(subviews[1].1.is_none());
706 /// let value0 = subviews[0].1.as_ref().unwrap().get();
707 /// assert_eq!(*value0, String::default());
708 /// # })
709 /// ```
710 pub async fn try_load_entries_pairs(
711 &self,
712 short_keys: Vec<Vec<u8>>,
713 ) -> Result<Vec<(Vec<u8>, Option<ReadGuardedView<W>>)>, ViewError> {
714 let values = self.try_load_entries(short_keys.clone()).await?;
715 Ok(short_keys.into_iter().zip(values).collect())
716 }
717
718 /// Loads all the entries for reading at once.
719 /// ```rust
720 /// # tokio_test::block_on(async {
721 /// # use linera_views::context::MemoryContext;
722 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
723 /// # use linera_views::register_view::RegisterView;
724 /// # use linera_views::views::View;
725 /// # let context = MemoryContext::new_for_testing(());
726 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
727 /// ReentrantByteCollectionView::load(context).await.unwrap();
728 /// {
729 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
730 /// }
731 /// let subviews = view.try_load_all_entries().await.unwrap();
732 /// assert_eq!(subviews.len(), 1);
733 /// # })
734 /// ```
735 pub async fn try_load_all_entries(
736 &self,
737 ) -> Result<Vec<(Vec<u8>, ReadGuardedView<W>)>, ViewError> {
738 let short_keys = self.keys().await?;
739 let mut loaded_views = vec![None; short_keys.len()];
740
741 // Load views that are not in updates and not deleted
742 if !self.delete_storage_first {
743 let mut keys = Vec::new();
744 let mut short_keys_and_indexes = Vec::new();
745 for (index, short_key) in short_keys.iter().enumerate() {
746 if !self.updates.contains_key(short_key) {
747 let key = self
748 .context
749 .base_key()
750 .base_tag_index(KeyTag::Subview as u8, short_key);
751 let context = self.context.clone_with_base_key(key);
752 keys.extend(W::pre_load(&context)?);
753 short_keys_and_indexes.push((short_key.to_vec(), index));
754 }
755 }
756 let values = self.context.store().read_multi_values_bytes(keys).await?;
757 for (loaded_values, (short_key, index)) in values
758 .chunks_exact_or_repeat(W::NUM_INIT_KEYS)
759 .zip(short_keys_and_indexes)
760 {
761 let key = self
762 .context
763 .base_key()
764 .base_tag_index(KeyTag::Subview as u8, &short_key);
765 let context = self.context.clone_with_base_key(key);
766 let view = W::post_load(context, loaded_values)?;
767 let wrapped_view = Arc::new(RwLock::new(view));
768 loaded_views[index] = Some(wrapped_view);
769 }
770 }
771
772 // Create result from updates and loaded views
773 short_keys
774 .into_iter()
775 .zip(loaded_views)
776 .map(|(short_key, loaded_view)| {
777 let view = if let Some(Update::Set(view)) = self.updates.get(&short_key) {
778 view.clone()
779 } else if let Some(view) = loaded_view {
780 view
781 } else {
782 unreachable!("All entries should have been loaded into memory");
783 };
784 let guard = ReadGuardedView(
785 view.try_read_arc()
786 .ok_or_else(|| ViewError::TryLockError(short_key.clone()))?,
787 );
788 Ok((short_key, guard))
789 })
790 .collect()
791 }
792
793 /// Loads all the entries for writing at once.
794 /// ```rust
795 /// # tokio_test::block_on(async {
796 /// # use linera_views::context::MemoryContext;
797 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
798 /// # use linera_views::register_view::RegisterView;
799 /// # use linera_views::views::View;
800 /// # let context = MemoryContext::new_for_testing(());
801 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
802 /// ReentrantByteCollectionView::load(context).await.unwrap();
803 /// {
804 /// let _subview = view.try_load_entry_mut(&[0, 1]).await.unwrap();
805 /// }
806 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
807 /// assert_eq!(subviews.len(), 1);
808 /// # })
809 /// ```
810 pub async fn try_load_all_entries_mut(
811 &mut self,
812 ) -> Result<Vec<(Vec<u8>, WriteGuardedView<W>)>, ViewError> {
813 let short_keys = self.keys().await?;
814 if !self.delete_storage_first {
815 let mut keys = Vec::new();
816 let mut short_keys_to_load = Vec::new();
817
818 for short_key in &short_keys {
819 if !self.updates.contains_key(short_key) {
820 let key = self
821 .context
822 .base_key()
823 .base_tag_index(KeyTag::Subview as u8, short_key);
824 let context = self.context.clone_with_base_key(key);
825 keys.extend(W::pre_load(&context)?);
826 short_keys_to_load.push(short_key.to_vec());
827 }
828 }
829
830 let values = self.context.store().read_multi_values_bytes(keys).await?;
831 for (loaded_values, short_key) in values
832 .chunks_exact_or_repeat(W::NUM_INIT_KEYS)
833 .zip(short_keys_to_load)
834 {
835 let key = self
836 .context
837 .base_key()
838 .base_tag_index(KeyTag::Subview as u8, &short_key);
839 let context = self.context.clone_with_base_key(key);
840 let view = W::post_load(context, loaded_values)?;
841 let wrapped_view = Arc::new(RwLock::new(view));
842 self.updates
843 .insert(short_key.to_vec(), Update::Set(wrapped_view));
844 }
845 }
846 short_keys
847 .into_iter()
848 .map(|short_key| {
849 let Some(Update::Set(view)) = self.updates.get(&short_key) else {
850 unreachable!("All entries should have been loaded into `updates`")
851 };
852 let guard = WriteGuardedView(
853 view.clone()
854 .try_write_arc()
855 .ok_or_else(|| ViewError::TryLockError(short_key.clone()))?,
856 );
857 Ok((short_key, guard))
858 })
859 .collect()
860 }
861}
862
863impl<W: View> ReentrantByteCollectionView<W::Context, W> {
864 /// Returns the list of indices in the collection in lexicographic order.
865 /// ```rust
866 /// # tokio_test::block_on(async {
867 /// # use linera_views::context::MemoryContext;
868 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
869 /// # use linera_views::register_view::RegisterView;
870 /// # use linera_views::views::View;
871 /// # let context = MemoryContext::new_for_testing(());
872 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
873 /// ReentrantByteCollectionView::load(context).await.unwrap();
874 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
875 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
876 /// let keys = view.keys().await.unwrap();
877 /// assert_eq!(keys, vec![vec![0, 1], vec![0, 2]]);
878 /// # })
879 /// ```
880 pub async fn keys(&self) -> Result<Vec<Vec<u8>>, ViewError> {
881 let mut keys = Vec::new();
882 self.for_each_key(|key| {
883 keys.push(key.to_vec());
884 Ok(())
885 })
886 .await?;
887 Ok(keys)
888 }
889
890 /// Returns the number of indices of the collection.
891 /// ```rust
892 /// # tokio_test::block_on(async {
893 /// # use linera_views::context::MemoryContext;
894 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
895 /// # use linera_views::register_view::RegisterView;
896 /// # use linera_views::views::View;
897 /// # let context = MemoryContext::new_for_testing(());
898 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
899 /// ReentrantByteCollectionView::load(context).await.unwrap();
900 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
901 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
902 /// assert_eq!(view.count().await.unwrap(), 2);
903 /// # })
904 /// ```
905 pub async fn count(&self) -> Result<usize, ViewError> {
906 let mut count = 0;
907 self.for_each_key(|_key| {
908 count += 1;
909 Ok(())
910 })
911 .await?;
912 Ok(count)
913 }
914
915 /// Applies a function f on each index (aka key). Keys are visited in a
916 /// lexicographic order. If the function returns false then the loop
917 /// ends prematurely.
918 /// ```rust
919 /// # tokio_test::block_on(async {
920 /// # use linera_views::context::MemoryContext;
921 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
922 /// # use linera_views::register_view::RegisterView;
923 /// # use linera_views::views::View;
924 /// # let context = MemoryContext::new_for_testing(());
925 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
926 /// ReentrantByteCollectionView::load(context).await.unwrap();
927 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
928 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
929 /// let mut count = 0;
930 /// view.for_each_key_while(|_key| {
931 /// count += 1;
932 /// Ok(count < 1)
933 /// })
934 /// .await
935 /// .unwrap();
936 /// assert_eq!(count, 1);
937 /// # })
938 /// ```
939 pub async fn for_each_key_while<F>(&self, mut f: F) -> Result<(), ViewError>
940 where
941 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
942 {
943 let mut updates = self.updates.iter();
944 let mut update = updates.next();
945 if !self.delete_storage_first {
946 let base = self.get_index_key(&[]);
947 for index in self.context.store().find_keys_by_prefix(&base).await? {
948 loop {
949 match update {
950 Some((key, value)) if key <= &index => {
951 if let Update::Set(_) = value {
952 if !f(key)? {
953 return Ok(());
954 }
955 }
956 update = updates.next();
957 if key == &index {
958 break;
959 }
960 }
961 _ => {
962 if !f(&index)? {
963 return Ok(());
964 }
965 break;
966 }
967 }
968 }
969 }
970 }
971 while let Some((key, value)) = update {
972 if let Update::Set(_) = value {
973 if !f(key)? {
974 return Ok(());
975 }
976 }
977 update = updates.next();
978 }
979 Ok(())
980 }
981
982 /// Applies a function f on each index (aka key). Keys are visited in a
983 /// lexicographic order.
984 /// ```rust
985 /// # tokio_test::block_on(async {
986 /// # use linera_views::context::MemoryContext;
987 /// # use linera_views::reentrant_collection_view::ReentrantByteCollectionView;
988 /// # use linera_views::register_view::RegisterView;
989 /// # use linera_views::views::View;
990 /// # let context = MemoryContext::new_for_testing(());
991 /// let mut view: ReentrantByteCollectionView<_, RegisterView<_, String>> =
992 /// ReentrantByteCollectionView::load(context).await.unwrap();
993 /// view.try_load_entry_mut(&[0, 1]).await.unwrap();
994 /// view.try_load_entry_mut(&[0, 2]).await.unwrap();
995 /// let mut count = 0;
996 /// view.for_each_key(|_key| {
997 /// count += 1;
998 /// Ok(())
999 /// })
1000 /// .await
1001 /// .unwrap();
1002 /// assert_eq!(count, 2);
1003 /// # })
1004 /// ```
1005 pub async fn for_each_key<F>(&self, mut f: F) -> Result<(), ViewError>
1006 where
1007 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
1008 {
1009 self.for_each_key_while(|key| {
1010 f(key)?;
1011 Ok(true)
1012 })
1013 .await
1014 }
1015}
1016
1017impl<W: HashableView> HashableView for ReentrantByteCollectionView<W::Context, W> {
1018 type Hasher = sha3::Sha3_256;
1019
1020 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1021 #[cfg(with_metrics)]
1022 let _hash_latency = metrics::REENTRANT_COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
1023 let mut hasher = sha3::Sha3_256::default();
1024 let keys = self.keys().await?;
1025 let count = keys.len() as u32;
1026 hasher.update_with_bcs_bytes(&count)?;
1027 for key in keys {
1028 hasher.update_with_bytes(&key)?;
1029 let hash = if let Some(entry) = self.updates.get_mut(&key) {
1030 let Update::Set(view) = entry else {
1031 unreachable!();
1032 };
1033 let mut view = view
1034 .try_write_arc()
1035 .ok_or_else(|| ViewError::TryLockError(key))?;
1036 view.hash_mut().await?
1037 } else {
1038 let key = self
1039 .context
1040 .base_key()
1041 .base_tag_index(KeyTag::Subview as u8, &key);
1042 let context = self.context.clone_with_base_key(key);
1043 let mut view = W::load(context).await?;
1044 view.hash_mut().await?
1045 };
1046 hasher.write_all(hash.as_ref())?;
1047 }
1048 Ok(hasher.finalize())
1049 }
1050
1051 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1052 #[cfg(with_metrics)]
1053 let _hash_latency = metrics::REENTRANT_COLLECTION_VIEW_HASH_RUNTIME.measure_latency();
1054 let mut hasher = sha3::Sha3_256::default();
1055 let keys = self.keys().await?;
1056 let count = keys.len() as u32;
1057 hasher.update_with_bcs_bytes(&count)?;
1058 for key in keys {
1059 hasher.update_with_bytes(&key)?;
1060 let hash = if let Some(entry) = self.updates.get(&key) {
1061 let Update::Set(view) = entry else {
1062 unreachable!();
1063 };
1064 let view = view
1065 .try_read_arc()
1066 .ok_or_else(|| ViewError::TryLockError(key))?;
1067 view.hash().await?
1068 } else {
1069 let key = self
1070 .context
1071 .base_key()
1072 .base_tag_index(KeyTag::Subview as u8, &key);
1073 let context = self.context.clone_with_base_key(key);
1074 let view = W::load(context).await?;
1075 view.hash().await?
1076 };
1077 hasher.write_all(hash.as_ref())?;
1078 }
1079 Ok(hasher.finalize())
1080 }
1081}
1082
1083/// A view that supports accessing a collection of views of the same kind, indexed by keys,
1084/// possibly several subviews at a time.
1085#[derive(Debug)]
1086pub struct ReentrantCollectionView<C, I, W> {
1087 collection: ReentrantByteCollectionView<C, W>,
1088 _phantom: PhantomData<I>,
1089}
1090
1091impl<I, W, C2> ReplaceContext<C2> for ReentrantCollectionView<W::Context, I, W>
1092where
1093 W: View + ReplaceContext<C2>,
1094 I: Send + Sync + Serialize + DeserializeOwned,
1095 C2: Context,
1096{
1097 type Target = ReentrantCollectionView<C2, I, <W as ReplaceContext<C2>>::Target>;
1098
1099 async fn with_context(
1100 &mut self,
1101 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
1102 ) -> Self::Target {
1103 ReentrantCollectionView {
1104 collection: self.collection.with_context(ctx).await,
1105 _phantom: self._phantom,
1106 }
1107 }
1108}
1109
1110impl<I, W> View for ReentrantCollectionView<W::Context, I, W>
1111where
1112 W: View,
1113 I: Send + Sync + Serialize + DeserializeOwned,
1114{
1115 const NUM_INIT_KEYS: usize = ReentrantByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
1116
1117 type Context = W::Context;
1118
1119 fn context(&self) -> &Self::Context {
1120 self.collection.context()
1121 }
1122
1123 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
1124 ReentrantByteCollectionView::<W::Context, W>::pre_load(context)
1125 }
1126
1127 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
1128 let collection = ReentrantByteCollectionView::post_load(context, values)?;
1129 Ok(ReentrantCollectionView {
1130 collection,
1131 _phantom: PhantomData,
1132 })
1133 }
1134
1135 fn rollback(&mut self) {
1136 self.collection.rollback()
1137 }
1138
1139 async fn has_pending_changes(&self) -> bool {
1140 self.collection.has_pending_changes().await
1141 }
1142
1143 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
1144 self.collection.flush(batch)
1145 }
1146
1147 fn clear(&mut self) {
1148 self.collection.clear()
1149 }
1150}
1151
1152impl<I, W> ClonableView for ReentrantCollectionView<W::Context, I, W>
1153where
1154 W: ClonableView,
1155 I: Send + Sync + Serialize + DeserializeOwned,
1156{
1157 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
1158 Ok(ReentrantCollectionView {
1159 collection: self.collection.clone_unchecked()?,
1160 _phantom: PhantomData,
1161 })
1162 }
1163}
1164
1165impl<I, W> ReentrantCollectionView<W::Context, I, W>
1166where
1167 W: View,
1168 I: Sync + Send + Serialize + DeserializeOwned,
1169{
1170 /// Loads a subview for the data at the given index in the collection. If an entry
1171 /// is absent then a default entry is put on the collection. The obtained view can
1172 /// then be modified.
1173 /// ```rust
1174 /// # tokio_test::block_on(async {
1175 /// # use linera_views::context::MemoryContext;
1176 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1177 /// # use linera_views::register_view::RegisterView;
1178 /// # use linera_views::views::View;
1179 /// # let context = MemoryContext::new_for_testing(());
1180 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1181 /// ReentrantCollectionView::load(context).await.unwrap();
1182 /// let subview = view.try_load_entry_mut(&23).await.unwrap();
1183 /// let value = subview.get();
1184 /// assert_eq!(*value, String::default());
1185 /// # })
1186 /// ```
1187 pub async fn try_load_entry_mut<Q>(
1188 &mut self,
1189 index: &Q,
1190 ) -> Result<WriteGuardedView<W>, ViewError>
1191 where
1192 I: Borrow<Q>,
1193 Q: Serialize + ?Sized,
1194 {
1195 let short_key = BaseKey::derive_short_key(index)?;
1196 self.collection.try_load_entry_mut(&short_key).await
1197 }
1198
1199 /// Loads a subview at the given index in the collection and gives read-only access to the data.
1200 /// If an entry is absent then `None` is returned.
1201 /// ```rust
1202 /// # tokio_test::block_on(async {
1203 /// # use linera_views::context::MemoryContext;
1204 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1205 /// # use linera_views::register_view::RegisterView;
1206 /// # use linera_views::views::View;
1207 /// # let context = MemoryContext::new_for_testing(());
1208 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1209 /// ReentrantCollectionView::load(context).await.unwrap();
1210 /// {
1211 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1212 /// }
1213 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1214 /// let value = subview.get();
1215 /// assert_eq!(*value, String::default());
1216 /// # })
1217 /// ```
1218 pub async fn try_load_entry<Q>(
1219 &self,
1220 index: &Q,
1221 ) -> Result<Option<ReadGuardedView<W>>, ViewError>
1222 where
1223 I: Borrow<Q>,
1224 Q: Serialize + ?Sized,
1225 {
1226 let short_key = BaseKey::derive_short_key(index)?;
1227 self.collection.try_load_entry(&short_key).await
1228 }
1229
1230 /// Returns `true` if the collection contains a value for the specified key.
1231 /// ```rust
1232 /// # tokio_test::block_on(async {
1233 /// # use linera_views::context::MemoryContext;
1234 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1235 /// # use linera_views::register_view::RegisterView;
1236 /// # use linera_views::views::View;
1237 /// # let context = MemoryContext::new_for_testing(());
1238 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1239 /// ReentrantCollectionView::load(context).await.unwrap();
1240 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1241 /// assert!(view.contains_key(&23).await.unwrap());
1242 /// assert!(!view.contains_key(&24).await.unwrap());
1243 /// # })
1244 /// ```
1245 pub async fn contains_key<Q>(&self, index: &Q) -> Result<bool, ViewError>
1246 where
1247 I: Borrow<Q>,
1248 Q: Serialize + ?Sized,
1249 {
1250 let short_key = BaseKey::derive_short_key(index)?;
1251 self.collection.contains_key(&short_key).await
1252 }
1253
1254 /// Marks the entry so that it is removed in the next flush.
1255 /// ```rust
1256 /// # tokio_test::block_on(async {
1257 /// # use linera_views::context::MemoryContext;
1258 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1259 /// # use linera_views::register_view::RegisterView;
1260 /// # use linera_views::views::View;
1261 /// # let context = MemoryContext::new_for_testing(());
1262 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1263 /// ReentrantCollectionView::load(context).await.unwrap();
1264 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1265 /// let value = subview.get_mut();
1266 /// assert_eq!(*value, String::default());
1267 /// view.remove_entry(&23);
1268 /// let keys = view.indices().await.unwrap();
1269 /// assert_eq!(keys.len(), 0);
1270 /// # })
1271 /// ```
1272 pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1273 where
1274 I: Borrow<Q>,
1275 Q: Serialize + ?Sized,
1276 {
1277 let short_key = BaseKey::derive_short_key(index)?;
1278 self.collection.remove_entry(short_key);
1279 Ok(())
1280 }
1281
1282 /// Marks the entry so that it is removed in the next flush.
1283 /// ```rust
1284 /// # tokio_test::block_on(async {
1285 /// # use linera_views::context::MemoryContext;
1286 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1287 /// # use linera_views::register_view::RegisterView;
1288 /// # use linera_views::views::View;
1289 /// # let context = MemoryContext::new_for_testing(());
1290 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1291 /// ReentrantCollectionView::load(context).await.unwrap();
1292 /// {
1293 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1294 /// let value = subview.get_mut();
1295 /// *value = String::from("Hello");
1296 /// }
1297 /// view.try_reset_entry_to_default(&23).unwrap();
1298 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1299 /// let value = subview.get_mut();
1300 /// assert_eq!(*value, String::default());
1301 /// # })
1302 /// ```
1303 pub fn try_reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1304 where
1305 I: Borrow<Q>,
1306 Q: Serialize + ?Sized,
1307 {
1308 let short_key = BaseKey::derive_short_key(index)?;
1309 self.collection.try_reset_entry_to_default(&short_key)
1310 }
1311
1312 /// Gets the extra data.
1313 pub fn extra(&self) -> &<W::Context as Context>::Extra {
1314 self.collection.extra()
1315 }
1316}
1317
1318impl<I, W> ReentrantCollectionView<W::Context, I, W>
1319where
1320 W: View,
1321 I: Sync + Send + Serialize + DeserializeOwned,
1322{
1323 /// Load multiple entries for writing at once.
1324 /// The entries in indices have to be all distinct.
1325 /// ```rust
1326 /// # tokio_test::block_on(async {
1327 /// # use linera_views::context::MemoryContext;
1328 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1329 /// # use linera_views::register_view::RegisterView;
1330 /// # use linera_views::views::View;
1331 /// # let context = MemoryContext::new_for_testing(());
1332 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1333 /// ReentrantCollectionView::load(context).await.unwrap();
1334 /// let indices = vec![23, 42];
1335 /// let subviews = view.try_load_entries_mut(&indices).await.unwrap();
1336 /// let value1 = subviews[0].get();
1337 /// let value2 = subviews[1].get();
1338 /// assert_eq!(*value1, String::default());
1339 /// assert_eq!(*value2, String::default());
1340 /// # })
1341 /// ```
1342 pub async fn try_load_entries_mut<'a, Q>(
1343 &'a mut self,
1344 indices: impl IntoIterator<Item = &'a Q>,
1345 ) -> Result<Vec<WriteGuardedView<W>>, ViewError>
1346 where
1347 I: Borrow<Q>,
1348 Q: Serialize + 'a,
1349 {
1350 let short_keys = indices
1351 .into_iter()
1352 .map(|index| BaseKey::derive_short_key(index))
1353 .collect::<Result<_, _>>()?;
1354 self.collection.try_load_entries_mut(short_keys).await
1355 }
1356
1357 /// Loads multiple entries for writing at once with their keys.
1358 /// The entries in indices have to be all distinct.
1359 /// ```rust
1360 /// # tokio_test::block_on(async {
1361 /// # use linera_views::context::MemoryContext;
1362 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1363 /// # use linera_views::register_view::RegisterView;
1364 /// # use linera_views::views::View;
1365 /// # let context = MemoryContext::new_for_testing(());
1366 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1367 /// ReentrantCollectionView::load(context).await.unwrap();
1368 /// let indices = [23, 42];
1369 /// let subviews = view.try_load_entries_pairs_mut(indices).await.unwrap();
1370 /// let value1 = subviews[0].1.get();
1371 /// let value2 = subviews[1].1.get();
1372 /// assert_eq!(*value1, String::default());
1373 /// assert_eq!(*value2, String::default());
1374 /// # })
1375 /// ```
1376 pub async fn try_load_entries_pairs_mut<Q>(
1377 &mut self,
1378 indices: impl IntoIterator<Item = Q>,
1379 ) -> Result<Vec<(Q, WriteGuardedView<W>)>, ViewError>
1380 where
1381 I: Borrow<Q>,
1382 Q: Serialize + Clone,
1383 {
1384 let indices_vec: Vec<Q> = indices.into_iter().collect();
1385 let values = self.try_load_entries_mut(indices_vec.iter()).await?;
1386 Ok(indices_vec.into_iter().zip(values).collect())
1387 }
1388
1389 /// Load multiple entries for reading at once.
1390 /// The entries in indices have to be all distinct.
1391 /// ```rust
1392 /// # tokio_test::block_on(async {
1393 /// # use linera_views::context::MemoryContext;
1394 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1395 /// # use linera_views::register_view::RegisterView;
1396 /// # use linera_views::views::View;
1397 /// # let context = MemoryContext::new_for_testing(());
1398 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1399 /// ReentrantCollectionView::load(context).await.unwrap();
1400 /// {
1401 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1402 /// }
1403 /// let indices = vec![23, 42];
1404 /// let subviews = view.try_load_entries(&indices).await.unwrap();
1405 /// assert!(subviews[1].is_none());
1406 /// let value0 = subviews[0].as_ref().unwrap().get();
1407 /// assert_eq!(*value0, String::default());
1408 /// # })
1409 /// ```
1410 pub async fn try_load_entries<'a, Q>(
1411 &'a self,
1412 indices: impl IntoIterator<Item = &'a Q>,
1413 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1414 where
1415 I: Borrow<Q>,
1416 Q: Serialize + 'a,
1417 {
1418 let short_keys = indices
1419 .into_iter()
1420 .map(|index| BaseKey::derive_short_key(index))
1421 .collect::<Result<_, _>>()?;
1422 self.collection.try_load_entries(short_keys).await
1423 }
1424
1425 /// Loads multiple entries for reading at once with their keys.
1426 /// The entries in indices have to be all distinct.
1427 /// ```rust
1428 /// # tokio_test::block_on(async {
1429 /// # use linera_views::context::MemoryContext;
1430 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1431 /// # use linera_views::register_view::RegisterView;
1432 /// # use linera_views::views::View;
1433 /// # let context = MemoryContext::new_for_testing(());
1434 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1435 /// ReentrantCollectionView::load(context).await.unwrap();
1436 /// {
1437 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1438 /// }
1439 /// let indices = [23, 42];
1440 /// let subviews = view.try_load_entries_pairs(indices).await.unwrap();
1441 /// assert!(subviews[1].1.is_none());
1442 /// let value0 = subviews[0].1.as_ref().unwrap().get();
1443 /// assert_eq!(*value0, String::default());
1444 /// # })
1445 /// ```
1446 pub async fn try_load_entries_pairs<Q>(
1447 &self,
1448 indices: impl IntoIterator<Item = Q>,
1449 ) -> Result<Vec<(Q, Option<ReadGuardedView<W>>)>, ViewError>
1450 where
1451 I: Borrow<Q>,
1452 Q: Serialize + Clone,
1453 {
1454 let indices_vec: Vec<Q> = indices.into_iter().collect();
1455 let values = self.try_load_entries(indices_vec.iter()).await?;
1456 Ok(indices_vec.into_iter().zip(values).collect())
1457 }
1458
1459 /// Loads all entries for writing at once.
1460 /// The entries in indices have to be all distinct.
1461 /// ```rust
1462 /// # tokio_test::block_on(async {
1463 /// # use linera_views::context::MemoryContext;
1464 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1465 /// # use linera_views::register_view::RegisterView;
1466 /// # use linera_views::views::View;
1467 /// # let context = MemoryContext::new_for_testing(());
1468 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1469 /// ReentrantCollectionView::load(context).await.unwrap();
1470 /// {
1471 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1472 /// }
1473 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
1474 /// assert_eq!(subviews.len(), 1);
1475 /// # })
1476 /// ```
1477 pub async fn try_load_all_entries_mut(
1478 &mut self,
1479 ) -> Result<Vec<(I, WriteGuardedView<W>)>, ViewError> {
1480 let results = self.collection.try_load_all_entries_mut().await?;
1481 results
1482 .into_iter()
1483 .map(|(short_key, view)| {
1484 let index = BaseKey::deserialize_value(&short_key)?;
1485 Ok((index, view))
1486 })
1487 .collect()
1488 }
1489
1490 /// Load multiple entries for reading at once.
1491 /// The entries in indices have to be all distinct.
1492 /// ```rust
1493 /// # tokio_test::block_on(async {
1494 /// # use linera_views::context::MemoryContext;
1495 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1496 /// # use linera_views::register_view::RegisterView;
1497 /// # use linera_views::views::View;
1498 /// # let context = MemoryContext::new_for_testing(());
1499 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1500 /// ReentrantCollectionView::load(context).await.unwrap();
1501 /// {
1502 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1503 /// }
1504 /// let subviews = view.try_load_all_entries().await.unwrap();
1505 /// assert_eq!(subviews.len(), 1);
1506 /// # })
1507 /// ```
1508 pub async fn try_load_all_entries(&self) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError> {
1509 let results = self.collection.try_load_all_entries().await?;
1510 results
1511 .into_iter()
1512 .map(|(short_key, view)| {
1513 let index = BaseKey::deserialize_value(&short_key)?;
1514 Ok((index, view))
1515 })
1516 .collect()
1517 }
1518}
1519
1520impl<I, W> ReentrantCollectionView<W::Context, I, W>
1521where
1522 W: View,
1523 I: Sync + Send + Serialize + DeserializeOwned,
1524{
1525 /// Returns the list of indices in the collection in an order determined
1526 /// by serialization.
1527 /// ```rust
1528 /// # tokio_test::block_on(async {
1529 /// # use linera_views::context::MemoryContext;
1530 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1531 /// # use linera_views::register_view::RegisterView;
1532 /// # use linera_views::views::View;
1533 /// # let context = MemoryContext::new_for_testing(());
1534 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1535 /// ReentrantCollectionView::load(context).await.unwrap();
1536 /// view.try_load_entry_mut(&23).await.unwrap();
1537 /// view.try_load_entry_mut(&25).await.unwrap();
1538 /// let indices = view.indices().await.unwrap();
1539 /// assert_eq!(indices.len(), 2);
1540 /// # })
1541 /// ```
1542 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
1543 let mut indices = Vec::new();
1544 self.for_each_index(|index| {
1545 indices.push(index);
1546 Ok(())
1547 })
1548 .await?;
1549 Ok(indices)
1550 }
1551
1552 /// Returns the number of indices in the collection.
1553 /// ```rust
1554 /// # tokio_test::block_on(async {
1555 /// # use linera_views::context::MemoryContext;
1556 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1557 /// # use linera_views::register_view::RegisterView;
1558 /// # use linera_views::views::View;
1559 /// # let context = MemoryContext::new_for_testing(());
1560 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1561 /// ReentrantCollectionView::load(context).await.unwrap();
1562 /// view.try_load_entry_mut(&23).await.unwrap();
1563 /// view.try_load_entry_mut(&25).await.unwrap();
1564 /// assert_eq!(view.count().await.unwrap(), 2);
1565 /// # })
1566 /// ```
1567 pub async fn count(&self) -> Result<usize, ViewError> {
1568 self.collection.count().await
1569 }
1570
1571 /// Applies a function f on each index. Indices are visited in an order
1572 /// determined by the serialization. If the function f returns false then
1573 /// the loop ends prematurely.
1574 /// ```rust
1575 /// # tokio_test::block_on(async {
1576 /// # use linera_views::context::MemoryContext;
1577 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1578 /// # use linera_views::register_view::RegisterView;
1579 /// # use linera_views::views::View;
1580 /// # let context = MemoryContext::new_for_testing(());
1581 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1582 /// ReentrantCollectionView::load(context).await.unwrap();
1583 /// view.try_load_entry_mut(&23).await.unwrap();
1584 /// view.try_load_entry_mut(&24).await.unwrap();
1585 /// let mut count = 0;
1586 /// view.for_each_index_while(|_key| {
1587 /// count += 1;
1588 /// Ok(count < 1)
1589 /// })
1590 /// .await
1591 /// .unwrap();
1592 /// assert_eq!(count, 1);
1593 /// # })
1594 /// ```
1595 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
1596 where
1597 F: FnMut(I) -> Result<bool, ViewError> + Send,
1598 {
1599 self.collection
1600 .for_each_key_while(|key| {
1601 let index = BaseKey::deserialize_value(key)?;
1602 f(index)
1603 })
1604 .await?;
1605 Ok(())
1606 }
1607
1608 /// Applies a function f on each index. Indices are visited in an order
1609 /// determined by the serialization.
1610 /// ```rust
1611 /// # tokio_test::block_on(async {
1612 /// # use linera_views::context::MemoryContext;
1613 /// # use linera_views::reentrant_collection_view::ReentrantCollectionView;
1614 /// # use linera_views::register_view::RegisterView;
1615 /// # use linera_views::views::View;
1616 /// # let context = MemoryContext::new_for_testing(());
1617 /// let mut view: ReentrantCollectionView<_, u64, RegisterView<_, String>> =
1618 /// ReentrantCollectionView::load(context).await.unwrap();
1619 /// view.try_load_entry_mut(&23).await.unwrap();
1620 /// view.try_load_entry_mut(&28).await.unwrap();
1621 /// let mut count = 0;
1622 /// view.for_each_index(|_key| {
1623 /// count += 1;
1624 /// Ok(())
1625 /// })
1626 /// .await
1627 /// .unwrap();
1628 /// assert_eq!(count, 2);
1629 /// # })
1630 /// ```
1631 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
1632 where
1633 F: FnMut(I) -> Result<(), ViewError> + Send,
1634 {
1635 self.collection
1636 .for_each_key(|key| {
1637 let index = BaseKey::deserialize_value(key)?;
1638 f(index)
1639 })
1640 .await?;
1641 Ok(())
1642 }
1643}
1644
1645impl<I, W> HashableView for ReentrantCollectionView<W::Context, I, W>
1646where
1647 W: HashableView,
1648 I: Send + Sync + Serialize + DeserializeOwned,
1649{
1650 type Hasher = sha3::Sha3_256;
1651
1652 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1653 self.collection.hash_mut().await
1654 }
1655
1656 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1657 self.collection.hash().await
1658 }
1659}
1660
1661/// A view that supports accessing a collection of views of the same kind, indexed by an ordered key,
1662/// possibly several subviews at a time.
1663#[derive(Debug)]
1664pub struct ReentrantCustomCollectionView<C, I, W> {
1665 collection: ReentrantByteCollectionView<C, W>,
1666 _phantom: PhantomData<I>,
1667}
1668
1669impl<I, W> View for ReentrantCustomCollectionView<W::Context, I, W>
1670where
1671 W: View,
1672 I: Send + Sync + CustomSerialize,
1673{
1674 const NUM_INIT_KEYS: usize = ReentrantByteCollectionView::<W::Context, W>::NUM_INIT_KEYS;
1675
1676 type Context = W::Context;
1677
1678 fn context(&self) -> &Self::Context {
1679 self.collection.context()
1680 }
1681
1682 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
1683 ReentrantByteCollectionView::<_, W>::pre_load(context)
1684 }
1685
1686 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
1687 let collection = ReentrantByteCollectionView::post_load(context, values)?;
1688 Ok(ReentrantCustomCollectionView {
1689 collection,
1690 _phantom: PhantomData,
1691 })
1692 }
1693
1694 fn rollback(&mut self) {
1695 self.collection.rollback()
1696 }
1697
1698 async fn has_pending_changes(&self) -> bool {
1699 self.collection.has_pending_changes().await
1700 }
1701
1702 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
1703 self.collection.flush(batch)
1704 }
1705
1706 fn clear(&mut self) {
1707 self.collection.clear()
1708 }
1709}
1710
1711impl<I, W> ClonableView for ReentrantCustomCollectionView<W::Context, I, W>
1712where
1713 W: ClonableView,
1714 Self: View,
1715{
1716 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
1717 Ok(ReentrantCustomCollectionView {
1718 collection: self.collection.clone_unchecked()?,
1719 _phantom: PhantomData,
1720 })
1721 }
1722}
1723
1724impl<I, W> ReentrantCustomCollectionView<W::Context, I, W>
1725where
1726 W: View,
1727 I: Sync + Send + CustomSerialize,
1728{
1729 /// Loads a subview for the data at the given index in the collection. If an entry
1730 /// is absent then a default entry is put in the collection on this index.
1731 /// ```rust
1732 /// # tokio_test::block_on(async {
1733 /// # use linera_views::context::MemoryContext;
1734 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1735 /// # use linera_views::register_view::RegisterView;
1736 /// # use linera_views::views::View;
1737 /// # let context = MemoryContext::new_for_testing(());
1738 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1739 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1740 /// let subview = view.try_load_entry_mut(&23).await.unwrap();
1741 /// let value = subview.get();
1742 /// assert_eq!(*value, String::default());
1743 /// # })
1744 /// ```
1745 pub async fn try_load_entry_mut<Q>(
1746 &mut self,
1747 index: &Q,
1748 ) -> Result<WriteGuardedView<W>, ViewError>
1749 where
1750 I: Borrow<Q>,
1751 Q: CustomSerialize,
1752 {
1753 let short_key = index.to_custom_bytes()?;
1754 self.collection.try_load_entry_mut(&short_key).await
1755 }
1756
1757 /// Loads a subview at the given index in the collection and gives read-only access to the data.
1758 /// If an entry is absent then `None` is returned.
1759 /// ```rust
1760 /// # tokio_test::block_on(async {
1761 /// # use linera_views::context::MemoryContext;
1762 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1763 /// # use linera_views::register_view::RegisterView;
1764 /// # use linera_views::views::View;
1765 /// # let context = MemoryContext::new_for_testing(());
1766 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1767 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1768 /// {
1769 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1770 /// }
1771 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1772 /// let value = subview.get();
1773 /// assert_eq!(*value, String::default());
1774 /// # })
1775 /// ```
1776 pub async fn try_load_entry<Q>(
1777 &self,
1778 index: &Q,
1779 ) -> Result<Option<ReadGuardedView<W>>, ViewError>
1780 where
1781 I: Borrow<Q>,
1782 Q: CustomSerialize,
1783 {
1784 let short_key = index.to_custom_bytes()?;
1785 self.collection.try_load_entry(&short_key).await
1786 }
1787
1788 /// Returns `true` if the collection contains a value for the specified key.
1789 /// ```rust
1790 /// # tokio_test::block_on(async {
1791 /// # use linera_views::context::MemoryContext;
1792 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1793 /// # use linera_views::register_view::RegisterView;
1794 /// # use linera_views::views::View;
1795 /// # let context = MemoryContext::new_for_testing(());
1796 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1797 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1798 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1799 /// assert!(view.contains_key(&23).await.unwrap());
1800 /// assert!(!view.contains_key(&24).await.unwrap());
1801 /// # })
1802 /// ```
1803 pub async fn contains_key<Q>(&self, index: &Q) -> Result<bool, ViewError>
1804 where
1805 I: Borrow<Q>,
1806 Q: CustomSerialize,
1807 {
1808 let short_key = index.to_custom_bytes()?;
1809 self.collection.contains_key(&short_key).await
1810 }
1811
1812 /// Removes an entry. If absent then nothing happens.
1813 /// ```rust
1814 /// # tokio_test::block_on(async {
1815 /// # use linera_views::context::MemoryContext;
1816 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1817 /// # use linera_views::register_view::RegisterView;
1818 /// # use linera_views::views::View;
1819 /// # let context = MemoryContext::new_for_testing(());
1820 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1821 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1822 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1823 /// let value = subview.get_mut();
1824 /// assert_eq!(*value, String::default());
1825 /// view.remove_entry(&23);
1826 /// let keys = view.indices().await.unwrap();
1827 /// assert_eq!(keys.len(), 0);
1828 /// # })
1829 /// ```
1830 pub fn remove_entry<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1831 where
1832 I: Borrow<Q>,
1833 Q: CustomSerialize,
1834 {
1835 let short_key = index.to_custom_bytes()?;
1836 self.collection.remove_entry(short_key);
1837 Ok(())
1838 }
1839
1840 /// Marks the entry so that it is removed in the next flush.
1841 /// ```rust
1842 /// # tokio_test::block_on(async {
1843 /// # use linera_views::context::MemoryContext;
1844 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1845 /// # use linera_views::register_view::RegisterView;
1846 /// # use linera_views::views::View;
1847 /// # let context = MemoryContext::new_for_testing(());
1848 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1849 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1850 /// {
1851 /// let mut subview = view.try_load_entry_mut(&23).await.unwrap();
1852 /// let value = subview.get_mut();
1853 /// *value = String::from("Hello");
1854 /// }
1855 /// {
1856 /// view.try_reset_entry_to_default(&23).unwrap();
1857 /// let subview = view.try_load_entry(&23).await.unwrap().unwrap();
1858 /// let value = subview.get();
1859 /// assert_eq!(*value, String::default());
1860 /// }
1861 /// # })
1862 /// ```
1863 pub fn try_reset_entry_to_default<Q>(&mut self, index: &Q) -> Result<(), ViewError>
1864 where
1865 I: Borrow<Q>,
1866 Q: CustomSerialize,
1867 {
1868 let short_key = index.to_custom_bytes()?;
1869 self.collection.try_reset_entry_to_default(&short_key)
1870 }
1871
1872 /// Gets the extra data.
1873 pub fn extra(&self) -> &<W::Context as Context>::Extra {
1874 self.collection.extra()
1875 }
1876}
1877
1878impl<I, W: View> ReentrantCustomCollectionView<W::Context, I, W>
1879where
1880 I: Sync + Send + CustomSerialize,
1881{
1882 /// Load multiple entries for writing at once.
1883 /// The entries in indices have to be all distinct.
1884 /// ```rust
1885 /// # tokio_test::block_on(async {
1886 /// # use linera_views::context::MemoryContext;
1887 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1888 /// # use linera_views::register_view::RegisterView;
1889 /// # use linera_views::views::View;
1890 /// # let context = MemoryContext::new_for_testing(());
1891 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1892 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1893 /// let subviews = view.try_load_entries_mut(&[23, 42]).await.unwrap();
1894 /// let value1 = subviews[0].get();
1895 /// let value2 = subviews[1].get();
1896 /// assert_eq!(*value1, String::default());
1897 /// assert_eq!(*value2, String::default());
1898 /// # })
1899 /// ```
1900 pub async fn try_load_entries_mut<'a, Q>(
1901 &mut self,
1902 indices: impl IntoIterator<Item = &'a Q>,
1903 ) -> Result<Vec<WriteGuardedView<W>>, ViewError>
1904 where
1905 I: Borrow<Q>,
1906 Q: CustomSerialize + 'a,
1907 {
1908 let short_keys = indices
1909 .into_iter()
1910 .map(|index| index.to_custom_bytes())
1911 .collect::<Result<_, _>>()?;
1912 self.collection.try_load_entries_mut(short_keys).await
1913 }
1914
1915 /// Loads multiple entries for writing at once with their keys.
1916 /// The entries in indices have to be all distinct.
1917 /// ```rust
1918 /// # tokio_test::block_on(async {
1919 /// # use linera_views::context::MemoryContext;
1920 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1921 /// # use linera_views::register_view::RegisterView;
1922 /// # use linera_views::views::View;
1923 /// # let context = MemoryContext::new_for_testing(());
1924 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1925 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1926 /// let indices = [23, 42];
1927 /// let subviews = view.try_load_entries_pairs_mut(indices).await.unwrap();
1928 /// let value1 = subviews[0].1.get();
1929 /// let value2 = subviews[1].1.get();
1930 /// assert_eq!(*value1, String::default());
1931 /// assert_eq!(*value2, String::default());
1932 /// # })
1933 /// ```
1934 pub async fn try_load_entries_pairs_mut<Q>(
1935 &mut self,
1936 indices: impl IntoIterator<Item = Q>,
1937 ) -> Result<Vec<(Q, WriteGuardedView<W>)>, ViewError>
1938 where
1939 I: Borrow<Q>,
1940 Q: CustomSerialize + Clone,
1941 {
1942 let indices_vec: Vec<Q> = indices.into_iter().collect();
1943 let values = self.try_load_entries_mut(indices_vec.iter()).await?;
1944 Ok(indices_vec.into_iter().zip(values).collect())
1945 }
1946
1947 /// Load multiple entries for reading at once.
1948 /// The entries in indices have to be all distinct.
1949 /// ```rust
1950 /// # tokio_test::block_on(async {
1951 /// # use linera_views::context::MemoryContext;
1952 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1953 /// # use linera_views::register_view::RegisterView;
1954 /// # use linera_views::views::View;
1955 /// # let context = MemoryContext::new_for_testing(());
1956 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1957 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1958 /// {
1959 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1960 /// }
1961 /// let subviews = view.try_load_entries(&[23, 42]).await.unwrap();
1962 /// assert!(subviews[1].is_none());
1963 /// let value0 = subviews[0].as_ref().unwrap().get();
1964 /// assert_eq!(*value0, String::default());
1965 /// # })
1966 /// ```
1967 pub async fn try_load_entries<'a, Q>(
1968 &self,
1969 indices: impl IntoIterator<Item = &'a Q>,
1970 ) -> Result<Vec<Option<ReadGuardedView<W>>>, ViewError>
1971 where
1972 I: Borrow<Q>,
1973 Q: CustomSerialize + 'a,
1974 {
1975 let short_keys = indices
1976 .into_iter()
1977 .map(|index| index.to_custom_bytes())
1978 .collect::<Result<_, _>>()?;
1979 self.collection.try_load_entries(short_keys).await
1980 }
1981
1982 /// Loads multiple entries for reading at once with their keys.
1983 /// The entries in indices have to be all distinct.
1984 /// ```rust
1985 /// # tokio_test::block_on(async {
1986 /// # use linera_views::context::MemoryContext;
1987 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
1988 /// # use linera_views::register_view::RegisterView;
1989 /// # use linera_views::views::View;
1990 /// # let context = MemoryContext::new_for_testing(());
1991 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
1992 /// ReentrantCustomCollectionView::load(context).await.unwrap();
1993 /// {
1994 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
1995 /// }
1996 /// let indices = [23, 42];
1997 /// let subviews = view.try_load_entries_pairs(indices).await.unwrap();
1998 /// assert!(subviews[1].1.is_none());
1999 /// let value0 = subviews[0].1.as_ref().unwrap().get();
2000 /// assert_eq!(*value0, String::default());
2001 /// # })
2002 /// ```
2003 pub async fn try_load_entries_pairs<Q>(
2004 &self,
2005 indices: impl IntoIterator<Item = Q>,
2006 ) -> Result<Vec<(Q, Option<ReadGuardedView<W>>)>, ViewError>
2007 where
2008 I: Borrow<Q>,
2009 Q: CustomSerialize + Clone,
2010 {
2011 let indices_vec: Vec<Q> = indices.into_iter().collect();
2012 let values = self.try_load_entries(indices_vec.iter()).await?;
2013 Ok(indices_vec.into_iter().zip(values).collect())
2014 }
2015
2016 /// Loads all entries for writing at once.
2017 /// The entries in indices have to be all distinct.
2018 /// ```rust
2019 /// # tokio_test::block_on(async {
2020 /// # use linera_views::context::MemoryContext;
2021 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2022 /// # use linera_views::register_view::RegisterView;
2023 /// # use linera_views::views::View;
2024 /// # let context = MemoryContext::new_for_testing(());
2025 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2026 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2027 /// {
2028 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
2029 /// }
2030 /// let subviews = view.try_load_all_entries_mut().await.unwrap();
2031 /// assert_eq!(subviews.len(), 1);
2032 /// # })
2033 /// ```
2034 pub async fn try_load_all_entries_mut(
2035 &mut self,
2036 ) -> Result<Vec<(I, WriteGuardedView<W>)>, ViewError> {
2037 let results = self.collection.try_load_all_entries_mut().await?;
2038 results
2039 .into_iter()
2040 .map(|(short_key, view)| {
2041 let index = I::from_custom_bytes(&short_key)?;
2042 Ok((index, view))
2043 })
2044 .collect()
2045 }
2046
2047 /// Load multiple entries for reading at once.
2048 /// The entries in indices have to be all distinct.
2049 /// ```rust
2050 /// # tokio_test::block_on(async {
2051 /// # use linera_views::context::MemoryContext;
2052 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2053 /// # use linera_views::register_view::RegisterView;
2054 /// # use linera_views::views::View;
2055 /// # let context = MemoryContext::new_for_testing(());
2056 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2057 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2058 /// {
2059 /// let _subview = view.try_load_entry_mut(&23).await.unwrap();
2060 /// }
2061 /// let subviews = view.try_load_all_entries().await.unwrap();
2062 /// assert_eq!(subviews.len(), 1);
2063 /// # })
2064 /// ```
2065 pub async fn try_load_all_entries(&self) -> Result<Vec<(I, ReadGuardedView<W>)>, ViewError> {
2066 let results = self.collection.try_load_all_entries().await?;
2067 results
2068 .into_iter()
2069 .map(|(short_key, view)| {
2070 let index = I::from_custom_bytes(&short_key)?;
2071 Ok((index, view))
2072 })
2073 .collect()
2074 }
2075}
2076
2077impl<I, W> ReentrantCustomCollectionView<W::Context, I, W>
2078where
2079 W: View,
2080 I: Sync + Send + CustomSerialize,
2081{
2082 /// Returns the list of indices in the collection. The order is determined by
2083 /// the custom serialization.
2084 /// ```rust
2085 /// # tokio_test::block_on(async {
2086 /// # use linera_views::context::MemoryContext;
2087 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2088 /// # use linera_views::register_view::RegisterView;
2089 /// # use linera_views::views::View;
2090 /// # let context = MemoryContext::new_for_testing(());
2091 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2092 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2093 /// view.try_load_entry_mut(&23).await.unwrap();
2094 /// view.try_load_entry_mut(&25).await.unwrap();
2095 /// let indices = view.indices().await.unwrap();
2096 /// assert_eq!(indices, vec![23, 25]);
2097 /// # })
2098 /// ```
2099 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
2100 let mut indices = Vec::new();
2101 self.for_each_index(|index| {
2102 indices.push(index);
2103 Ok(())
2104 })
2105 .await?;
2106 Ok(indices)
2107 }
2108
2109 /// Returns the number of entries in the collection.
2110 /// ```rust
2111 /// # tokio_test::block_on(async {
2112 /// # use linera_views::context::MemoryContext;
2113 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2114 /// # use linera_views::register_view::RegisterView;
2115 /// # use linera_views::views::View;
2116 /// # let context = MemoryContext::new_for_testing(());
2117 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2118 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2119 /// view.try_load_entry_mut(&23).await.unwrap();
2120 /// view.try_load_entry_mut(&25).await.unwrap();
2121 /// assert_eq!(view.count().await.unwrap(), 2);
2122 /// # })
2123 /// ```
2124 pub async fn count(&self) -> Result<usize, ViewError> {
2125 self.collection.count().await
2126 }
2127
2128 /// Applies a function f on each index. Indices are visited in an order
2129 /// determined by the custom serialization. If the function f returns false
2130 /// then the loop ends prematurely.
2131 /// ```rust
2132 /// # tokio_test::block_on(async {
2133 /// # use linera_views::context::MemoryContext;
2134 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2135 /// # use linera_views::register_view::RegisterView;
2136 /// # use linera_views::views::View;
2137 /// # let context = MemoryContext::new_for_testing(());
2138 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2139 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2140 /// view.try_load_entry_mut(&28).await.unwrap();
2141 /// view.try_load_entry_mut(&24).await.unwrap();
2142 /// view.try_load_entry_mut(&23).await.unwrap();
2143 /// let mut part_indices = Vec::new();
2144 /// view.for_each_index_while(|index| {
2145 /// part_indices.push(index);
2146 /// Ok(part_indices.len() < 2)
2147 /// })
2148 /// .await
2149 /// .unwrap();
2150 /// assert_eq!(part_indices, vec![23, 24]);
2151 /// # })
2152 /// ```
2153 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
2154 where
2155 F: FnMut(I) -> Result<bool, ViewError> + Send,
2156 {
2157 self.collection
2158 .for_each_key_while(|key| {
2159 let index = I::from_custom_bytes(key)?;
2160 f(index)
2161 })
2162 .await?;
2163 Ok(())
2164 }
2165
2166 /// Applies a function f on each index. Indices are visited in an order
2167 /// determined by the custom serialization.
2168 /// ```rust
2169 /// # tokio_test::block_on(async {
2170 /// # use linera_views::context::MemoryContext;
2171 /// # use linera_views::reentrant_collection_view::ReentrantCustomCollectionView;
2172 /// # use linera_views::register_view::RegisterView;
2173 /// # use linera_views::views::View;
2174 /// # let context = MemoryContext::new_for_testing(());
2175 /// let mut view: ReentrantCustomCollectionView<_, u128, RegisterView<_, String>> =
2176 /// ReentrantCustomCollectionView::load(context).await.unwrap();
2177 /// view.try_load_entry_mut(&28).await.unwrap();
2178 /// view.try_load_entry_mut(&24).await.unwrap();
2179 /// view.try_load_entry_mut(&23).await.unwrap();
2180 /// let mut indices = Vec::new();
2181 /// view.for_each_index(|index| {
2182 /// indices.push(index);
2183 /// Ok(())
2184 /// })
2185 /// .await
2186 /// .unwrap();
2187 /// assert_eq!(indices, vec![23, 24, 28]);
2188 /// # })
2189 /// ```
2190 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
2191 where
2192 F: FnMut(I) -> Result<(), ViewError> + Send,
2193 {
2194 self.collection
2195 .for_each_key(|key| {
2196 let index = I::from_custom_bytes(key)?;
2197 f(index)
2198 })
2199 .await?;
2200 Ok(())
2201 }
2202}
2203
2204impl<I, W> HashableView for ReentrantCustomCollectionView<W::Context, I, W>
2205where
2206 W: HashableView,
2207 I: Send + Sync + CustomSerialize,
2208{
2209 type Hasher = sha3::Sha3_256;
2210
2211 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
2212 self.collection.hash_mut().await
2213 }
2214
2215 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
2216 self.collection.hash().await
2217 }
2218}
2219
2220/// Type wrapping `ReentrantByteCollectionView` while memoizing the hash.
2221pub type HashedReentrantByteCollectionView<C, W> =
2222 WrappedHashableContainerView<C, ReentrantByteCollectionView<C, W>, HasherOutput>;
2223
2224/// Type wrapping `ReentrantCollectionView` while memoizing the hash.
2225pub type HashedReentrantCollectionView<C, I, W> =
2226 WrappedHashableContainerView<C, ReentrantCollectionView<C, I, W>, HasherOutput>;
2227
2228/// Type wrapping `ReentrantCustomCollectionView` while memoizing the hash.
2229pub type HashedReentrantCustomCollectionView<C, I, W> =
2230 WrappedHashableContainerView<C, ReentrantCustomCollectionView<C, I, W>, HasherOutput>;
2231
2232/// Wrapper around `ReentrantByteCollectionView` to compute hashes based on the history of changes.
2233pub type HistoricallyHashedReentrantByteCollectionView<C, W> =
2234 HistoricallyHashableView<C, ReentrantByteCollectionView<C, W>>;
2235
2236/// Wrapper around `ReentrantCollectionView` to compute hashes based on the history of changes.
2237pub type HistoricallyHashedReentrantCollectionView<C, I, W> =
2238 HistoricallyHashableView<C, ReentrantCollectionView<C, I, W>>;
2239
2240/// Wrapper around `ReentrantCustomCollectionView` to compute hashes based on the history of changes.
2241pub type HistoricallyHashedReentrantCustomCollectionView<C, I, W> =
2242 HistoricallyHashableView<C, ReentrantCustomCollectionView<C, I, W>>;
2243
2244#[cfg(with_graphql)]
2245mod graphql {
2246 use std::borrow::Cow;
2247
2248 use super::{ReadGuardedView, ReentrantCollectionView};
2249 use crate::{
2250 graphql::{hash_name, mangle, missing_key_error, Entry, MapInput},
2251 views::View,
2252 };
2253
2254 impl<T: async_graphql::OutputType> async_graphql::OutputType for ReadGuardedView<T> {
2255 fn type_name() -> Cow<'static, str> {
2256 T::type_name()
2257 }
2258
2259 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
2260 T::create_type_info(registry)
2261 }
2262
2263 async fn resolve(
2264 &self,
2265 ctx: &async_graphql::ContextSelectionSet<'_>,
2266 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
2267 ) -> async_graphql::ServerResult<async_graphql::Value> {
2268 (**self).resolve(ctx, field).await
2269 }
2270 }
2271
2272 impl<C: Send + Sync, K: async_graphql::OutputType, V: async_graphql::OutputType>
2273 async_graphql::TypeName for ReentrantCollectionView<C, K, V>
2274 {
2275 fn type_name() -> Cow<'static, str> {
2276 format!(
2277 "ReentrantCollectionView_{}_{}_{:08x}",
2278 mangle(K::type_name()),
2279 mangle(V::type_name()),
2280 hash_name::<(K, V)>(),
2281 )
2282 .into()
2283 }
2284 }
2285
2286 #[async_graphql::Object(cache_control(no_cache), name_type)]
2287 impl<K, V> ReentrantCollectionView<V::Context, K, V>
2288 where
2289 K: async_graphql::InputType
2290 + async_graphql::OutputType
2291 + serde::ser::Serialize
2292 + serde::de::DeserializeOwned
2293 + std::fmt::Debug,
2294 V: View + async_graphql::OutputType,
2295 {
2296 async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
2297 Ok(self.indices().await?)
2298 }
2299
2300 #[graphql(derived(name = "count"))]
2301 async fn count_(&self) -> Result<u32, async_graphql::Error> {
2302 Ok(self.count().await? as u32)
2303 }
2304
2305 async fn entry(
2306 &self,
2307 key: K,
2308 ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
2309 let value = self
2310 .try_load_entry(&key)
2311 .await?
2312 .ok_or_else(|| missing_key_error(&key))?;
2313 Ok(Entry { value, key })
2314 }
2315
2316 async fn entries(
2317 &self,
2318 input: Option<MapInput<K>>,
2319 ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
2320 let keys = if let Some(keys) = input
2321 .and_then(|input| input.filters)
2322 .and_then(|filters| filters.keys)
2323 {
2324 keys
2325 } else {
2326 self.indices().await?
2327 };
2328
2329 let values = self.try_load_entries(&keys).await?;
2330 Ok(values
2331 .into_iter()
2332 .zip(keys)
2333 .filter_map(|(value, key)| value.map(|value| Entry { value, key }))
2334 .collect())
2335 }
2336 }
2337
2338 use crate::reentrant_collection_view::ReentrantCustomCollectionView;
2339 impl<C: Send + Sync, K: async_graphql::OutputType, V: async_graphql::OutputType>
2340 async_graphql::TypeName for ReentrantCustomCollectionView<C, K, V>
2341 {
2342 fn type_name() -> Cow<'static, str> {
2343 format!(
2344 "ReentrantCustomCollectionView_{}_{}_{:08x}",
2345 mangle(K::type_name()),
2346 mangle(V::type_name()),
2347 hash_name::<(K, V)>(),
2348 )
2349 .into()
2350 }
2351 }
2352
2353 #[async_graphql::Object(cache_control(no_cache), name_type)]
2354 impl<K, V> ReentrantCustomCollectionView<V::Context, K, V>
2355 where
2356 K: async_graphql::InputType
2357 + async_graphql::OutputType
2358 + crate::common::CustomSerialize
2359 + std::fmt::Debug,
2360 V: View + async_graphql::OutputType,
2361 {
2362 async fn keys(&self) -> Result<Vec<K>, async_graphql::Error> {
2363 Ok(self.indices().await?)
2364 }
2365
2366 async fn entry(
2367 &self,
2368 key: K,
2369 ) -> Result<Entry<K, ReadGuardedView<V>>, async_graphql::Error> {
2370 let value = self
2371 .try_load_entry(&key)
2372 .await?
2373 .ok_or_else(|| missing_key_error(&key))?;
2374 Ok(Entry { value, key })
2375 }
2376
2377 async fn entries(
2378 &self,
2379 input: Option<MapInput<K>>,
2380 ) -> Result<Vec<Entry<K, ReadGuardedView<V>>>, async_graphql::Error> {
2381 let keys = if let Some(keys) = input
2382 .and_then(|input| input.filters)
2383 .and_then(|filters| filters.keys)
2384 {
2385 keys
2386 } else {
2387 self.indices().await?
2388 };
2389
2390 let values = self.try_load_entries(&keys).await?;
2391 Ok(values
2392 .into_iter()
2393 .zip(keys)
2394 .filter_map(|(value, key)| value.map(|value| Entry { value, key }))
2395 .collect())
2396 }
2397 }
2398}