1use std::{borrow::Borrow, collections::BTreeMap, marker::PhantomData, mem};
5
6#[cfg(with_metrics)]
7use linera_base::prometheus_util::MeasureLatency as _;
8use serde::{de::DeserializeOwned, Serialize};
9
10use crate::{
11 batch::Batch,
12 common::{CustomSerialize, HasherOutput, Update},
13 context::{BaseKey, Context},
14 hashable_wrapper::WrappedHashableContainerView,
15 historical_hash_wrapper::HistoricallyHashableView,
16 store::ReadableKeyValueStore as _,
17 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError},
18};
19
20#[cfg(with_metrics)]
21mod metrics {
22 use std::sync::LazyLock;
23
24 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
25 use prometheus::HistogramVec;
26
27 pub static SET_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
29 register_histogram_vec(
30 "set_view_hash_runtime",
31 "SetView hash runtime",
32 &[],
33 exponential_bucket_latencies(5.0),
34 )
35 });
36}
37
38#[derive(Debug)]
40pub struct ByteSetView<C> {
41 context: C,
42 delete_storage_first: bool,
43 updates: BTreeMap<Vec<u8>, Update<()>>,
44}
45
46impl<C: Context, C2: Context> ReplaceContext<C2> for ByteSetView<C> {
47 type Target = ByteSetView<C2>;
48
49 async fn with_context(
50 &mut self,
51 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
52 ) -> Self::Target {
53 ByteSetView {
54 context: ctx(self.context()),
55 delete_storage_first: self.delete_storage_first,
56 updates: self.updates.clone(),
57 }
58 }
59}
60
61impl<C: Context> View for ByteSetView<C> {
62 const NUM_INIT_KEYS: usize = 0;
63
64 type Context = C;
65
66 fn context(&self) -> &C {
67 &self.context
68 }
69
70 fn pre_load(_context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
71 Ok(Vec::new())
72 }
73
74 fn post_load(context: C, _values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
75 Ok(Self {
76 context,
77 delete_storage_first: false,
78 updates: BTreeMap::new(),
79 })
80 }
81
82 fn rollback(&mut self) {
83 self.delete_storage_first = false;
84 self.updates.clear();
85 }
86
87 async fn has_pending_changes(&self) -> bool {
88 if self.delete_storage_first {
89 return true;
90 }
91 !self.updates.is_empty()
92 }
93
94 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
95 let mut delete_view = false;
96 if self.delete_storage_first {
97 delete_view = true;
98 batch.delete_key_prefix(self.context.base_key().bytes.clone());
99 for (index, update) in mem::take(&mut self.updates) {
100 if let Update::Set(_) = update {
101 let key = self.context.base_key().base_index(&index);
102 batch.put_key_value_bytes(key, Vec::new());
103 delete_view = false;
104 }
105 }
106 } else {
107 for (index, update) in mem::take(&mut self.updates) {
108 let key = self.context.base_key().base_index(&index);
109 match update {
110 Update::Removed => batch.delete_key(key),
111 Update::Set(_) => batch.put_key_value_bytes(key, Vec::new()),
112 }
113 }
114 }
115 self.delete_storage_first = false;
116 Ok(delete_view)
117 }
118
119 fn clear(&mut self) {
120 self.delete_storage_first = true;
121 self.updates.clear();
122 }
123}
124
125impl<C: Context> ClonableView for ByteSetView<C> {
126 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
127 Ok(ByteSetView {
128 context: self.context.clone(),
129 delete_storage_first: self.delete_storage_first,
130 updates: self.updates.clone(),
131 })
132 }
133}
134
135impl<C: Context> ByteSetView<C> {
136 pub fn insert(&mut self, short_key: Vec<u8>) {
148 self.updates.insert(short_key, Update::Set(()));
149 }
150
151 pub fn remove(&mut self, short_key: Vec<u8>) {
163 if self.delete_storage_first {
164 self.updates.remove(&short_key);
166 } else {
167 self.updates.insert(short_key, Update::Removed);
168 }
169 }
170
171 pub fn extra(&self) -> &C::Extra {
173 self.context.extra()
174 }
175}
176
177impl<C: Context> ByteSetView<C> {
178 pub async fn contains(&self, short_key: &[u8]) -> Result<bool, ViewError> {
191 if let Some(update) = self.updates.get(short_key) {
192 let value = match update {
193 Update::Removed => false,
194 Update::Set(()) => true,
195 };
196 return Ok(value);
197 }
198 if self.delete_storage_first {
199 return Ok(false);
200 }
201 let key = self.context.base_key().base_index(short_key);
202 Ok(self.context.store().contains_key(&key).await?)
203 }
204}
205
206impl<C: Context> ByteSetView<C> {
207 pub async fn keys(&self) -> Result<Vec<Vec<u8>>, ViewError> {
220 let mut keys = Vec::new();
221 self.for_each_key(|key| {
222 keys.push(key.to_vec());
223 Ok(())
224 })
225 .await?;
226 Ok(keys)
227 }
228
229 pub async fn count(&self) -> Result<usize, ViewError> {
242 let mut count = 0;
243 self.for_each_key(|_key| {
244 count += 1;
245 Ok(())
246 })
247 .await?;
248 Ok(count)
249 }
250
251 pub async fn for_each_key_while<F>(&self, mut f: F) -> Result<(), ViewError>
274 where
275 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
276 {
277 let mut updates = self.updates.iter();
278 let mut update = updates.next();
279 if !self.delete_storage_first {
280 let base = &self.context.base_key().bytes;
281 for index in self.context.store().find_keys_by_prefix(base).await? {
282 loop {
283 match update {
284 Some((key, value)) if key <= &index => {
285 if let Update::Set(_) = value {
286 if !f(key)? {
287 return Ok(());
288 }
289 }
290 update = updates.next();
291 if key == &index {
292 break;
293 }
294 }
295 _ => {
296 if !f(&index)? {
297 return Ok(());
298 }
299 break;
300 }
301 }
302 }
303 }
304 }
305 while let Some((key, value)) = update {
306 if let Update::Set(_) = value {
307 if !f(key)? {
308 return Ok(());
309 }
310 }
311 update = updates.next();
312 }
313 Ok(())
314 }
315
316 pub async fn for_each_key<F>(&self, mut f: F) -> Result<(), ViewError>
338 where
339 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
340 {
341 self.for_each_key_while(|key| {
342 f(key)?;
343 Ok(true)
344 })
345 .await
346 }
347}
348
349impl<C: Context> HashableView for ByteSetView<C> {
350 type Hasher = sha3::Sha3_256;
351
352 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
353 self.hash().await
354 }
355
356 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
357 #[cfg(with_metrics)]
358 let _hash_latency = metrics::SET_VIEW_HASH_RUNTIME.measure_latency();
359 let mut hasher = sha3::Sha3_256::default();
360 let mut count = 0u32;
361 self.for_each_key(|key| {
362 count += 1;
363 hasher.update_with_bytes(key)?;
364 Ok(())
365 })
366 .await?;
367 hasher.update_with_bcs_bytes(&count)?;
368 Ok(hasher.finalize())
369 }
370}
371
372#[derive(Debug)]
374pub struct SetView<C, I> {
375 set: ByteSetView<C>,
376 _phantom: PhantomData<I>,
377}
378
379impl<C: Context, I: Send + Sync + Serialize, C2: Context> ReplaceContext<C2> for SetView<C, I> {
380 type Target = SetView<C2, I>;
381
382 async fn with_context(
383 &mut self,
384 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
385 ) -> Self::Target {
386 SetView {
387 set: self.set.with_context(ctx).await,
388 _phantom: self._phantom,
389 }
390 }
391}
392
393impl<C: Context, I: Send + Sync + Serialize> View for SetView<C, I> {
394 const NUM_INIT_KEYS: usize = ByteSetView::<C>::NUM_INIT_KEYS;
395
396 type Context = C;
397
398 fn context(&self) -> &C {
399 self.set.context()
400 }
401
402 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
403 ByteSetView::<C>::pre_load(context)
404 }
405
406 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
407 let set = ByteSetView::post_load(context, values)?;
408 Ok(Self {
409 set,
410 _phantom: PhantomData,
411 })
412 }
413
414 fn rollback(&mut self) {
415 self.set.rollback()
416 }
417
418 async fn has_pending_changes(&self) -> bool {
419 self.set.has_pending_changes().await
420 }
421
422 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
423 self.set.flush(batch)
424 }
425
426 fn clear(&mut self) {
427 self.set.clear()
428 }
429}
430
431impl<C, I> ClonableView for SetView<C, I>
432where
433 C: Context,
434 I: Send + Sync + Serialize,
435{
436 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
437 Ok(SetView {
438 set: self.set.clone_unchecked()?,
439 _phantom: PhantomData,
440 })
441 }
442}
443
444impl<C: Context, I: Serialize> SetView<C, I> {
445 pub fn insert<Q>(&mut self, index: &Q) -> Result<(), ViewError>
458 where
459 I: Borrow<Q>,
460 Q: Serialize + ?Sized,
461 {
462 let short_key = BaseKey::derive_short_key(index)?;
463 self.set.insert(short_key);
464 Ok(())
465 }
466
467 pub fn remove<Q>(&mut self, index: &Q) -> Result<(), ViewError>
479 where
480 I: Borrow<Q>,
481 Q: Serialize + ?Sized,
482 {
483 let short_key = BaseKey::derive_short_key(index)?;
484 self.set.remove(short_key);
485 Ok(())
486 }
487
488 pub fn extra(&self) -> &C::Extra {
490 self.set.extra()
491 }
492}
493
494impl<C: Context, I: Serialize> SetView<C, I> {
495 pub async fn contains<Q>(&self, index: &Q) -> Result<bool, ViewError>
508 where
509 I: Borrow<Q>,
510 Q: Serialize + ?Sized,
511 {
512 let short_key = BaseKey::derive_short_key(index)?;
513 self.set.contains(&short_key).await
514 }
515}
516
517impl<C: Context, I: Serialize + DeserializeOwned + Send> SetView<C, I> {
518 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
530 let mut indices = Vec::new();
531 self.for_each_index(|index| {
532 indices.push(index);
533 Ok(())
534 })
535 .await?;
536 Ok(indices)
537 }
538
539 pub async fn count(&self) -> Result<usize, ViewError> {
551 self.set.count().await
552 }
553
554 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
578 where
579 F: FnMut(I) -> Result<bool, ViewError> + Send,
580 {
581 self.set
582 .for_each_key_while(|key| {
583 let index = BaseKey::deserialize_value(key)?;
584 f(index)
585 })
586 .await?;
587 Ok(())
588 }
589
590 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
613 where
614 F: FnMut(I) -> Result<(), ViewError> + Send,
615 {
616 self.set
617 .for_each_key(|key| {
618 let index = BaseKey::deserialize_value(key)?;
619 f(index)
620 })
621 .await?;
622 Ok(())
623 }
624}
625
626impl<C, I> HashableView for SetView<C, I>
627where
628 Self: View,
629 ByteSetView<C>: HashableView,
630{
631 type Hasher = <ByteSetView<C> as HashableView>::Hasher;
632
633 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
634 self.set.hash_mut().await
635 }
636
637 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
638 self.set.hash().await
639 }
640}
641
642#[derive(Debug)]
645pub struct CustomSetView<C, I> {
646 set: ByteSetView<C>,
647 _phantom: PhantomData<I>,
648}
649
650impl<C, I> View for CustomSetView<C, I>
651where
652 C: Context,
653 I: Send + Sync + CustomSerialize,
654{
655 const NUM_INIT_KEYS: usize = ByteSetView::<C>::NUM_INIT_KEYS;
656
657 type Context = C;
658
659 fn context(&self) -> &C {
660 self.set.context()
661 }
662
663 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
664 ByteSetView::pre_load(context)
665 }
666
667 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
668 let set = ByteSetView::post_load(context, values)?;
669 Ok(Self {
670 set,
671 _phantom: PhantomData,
672 })
673 }
674
675 fn rollback(&mut self) {
676 self.set.rollback()
677 }
678
679 async fn has_pending_changes(&self) -> bool {
680 self.set.has_pending_changes().await
681 }
682
683 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
684 self.set.flush(batch)
685 }
686
687 fn clear(&mut self) {
688 self.set.clear()
689 }
690}
691
692impl<C, I> ClonableView for CustomSetView<C, I>
693where
694 C: Context,
695 I: Send + Sync + CustomSerialize,
696{
697 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
698 Ok(CustomSetView {
699 set: self.set.clone_unchecked()?,
700 _phantom: PhantomData,
701 })
702 }
703}
704
705impl<C: Context, I: CustomSerialize> CustomSetView<C, I> {
706 pub fn insert<Q>(&mut self, index: &Q) -> Result<(), ViewError>
719 where
720 I: Borrow<Q>,
721 Q: CustomSerialize,
722 {
723 let short_key = index.to_custom_bytes()?;
724 self.set.insert(short_key);
725 Ok(())
726 }
727
728 pub fn remove<Q>(&mut self, index: &Q) -> Result<(), ViewError>
741 where
742 I: Borrow<Q>,
743 Q: CustomSerialize,
744 {
745 let short_key = index.to_custom_bytes()?;
746 self.set.remove(short_key);
747 Ok(())
748 }
749
750 pub fn extra(&self) -> &C::Extra {
752 self.set.extra()
753 }
754}
755
756impl<C, I> CustomSetView<C, I>
757where
758 C: Context,
759 I: CustomSerialize,
760{
761 pub async fn contains<Q>(&self, index: &Q) -> Result<bool, ViewError>
775 where
776 I: Borrow<Q>,
777 Q: CustomSerialize,
778 {
779 let short_key = index.to_custom_bytes()?;
780 self.set.contains(&short_key).await
781 }
782}
783
784impl<C, I> CustomSetView<C, I>
785where
786 C: Context,
787 I: Sync + Send + CustomSerialize,
788{
789 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
804 let mut indices = Vec::new();
805 self.for_each_index(|index| {
806 indices.push(index);
807 Ok(())
808 })
809 .await?;
810 Ok(indices)
811 }
812
813 pub async fn count(&self) -> Result<usize, ViewError> {
827 self.set.count().await
828 }
829
830 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
854 where
855 F: FnMut(I) -> Result<bool, ViewError> + Send,
856 {
857 self.set
858 .for_each_key_while(|key| {
859 let index = I::from_custom_bytes(key)?;
860 f(index)
861 })
862 .await?;
863 Ok(())
864 }
865
866 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
889 where
890 F: FnMut(I) -> Result<(), ViewError> + Send,
891 {
892 self.set
893 .for_each_key(|key| {
894 let index = I::from_custom_bytes(key)?;
895 f(index)
896 })
897 .await?;
898 Ok(())
899 }
900}
901
902impl<C: Context, I> HashableView for CustomSetView<C, I>
903where
904 Self: View,
905{
906 type Hasher = sha3::Sha3_256;
907
908 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
909 self.set.hash_mut().await
910 }
911
912 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
913 self.set.hash().await
914 }
915}
916
917pub type HashedByteSetView<C> = WrappedHashableContainerView<C, ByteSetView<C>, HasherOutput>;
919
920pub type HistoricallyHashedByteSetView<C> = HistoricallyHashableView<C, ByteSetView<C>>;
922
923pub type HashedSetView<C, I> = WrappedHashableContainerView<C, SetView<C, I>, HasherOutput>;
925
926pub type HistoricallyHashedSetView<C, I> = HistoricallyHashableView<C, SetView<C, I>>;
928
929pub type HashedCustomSetView<C, I> =
931 WrappedHashableContainerView<C, CustomSetView<C, I>, HasherOutput>;
932
933pub type HistoricallyHashedCustomSetView<C, I> = HistoricallyHashableView<C, CustomSetView<C, I>>;
935
936#[cfg(with_graphql)]
937mod graphql {
938 use std::borrow::Cow;
939
940 use serde::{de::DeserializeOwned, Serialize};
941
942 use super::{CustomSetView, SetView};
943 use crate::{
944 common::CustomSerialize,
945 context::Context,
946 graphql::{hash_name, mangle},
947 };
948
949 impl<C: Send + Sync, I: async_graphql::OutputType> async_graphql::TypeName for SetView<C, I> {
950 fn type_name() -> Cow<'static, str> {
951 format!(
952 "SetView_{}_{:08x}",
953 mangle(I::type_name()),
954 hash_name::<I>(),
955 )
956 .into()
957 }
958 }
959
960 #[async_graphql::Object(cache_control(no_cache), name_type)]
961 impl<C, I> SetView<C, I>
962 where
963 C: Context,
964 I: Send + Sync + Serialize + DeserializeOwned + async_graphql::OutputType,
965 {
966 async fn elements(&self, count: Option<usize>) -> Result<Vec<I>, async_graphql::Error> {
967 let mut indices = self.indices().await?;
968 if let Some(count) = count {
969 indices.truncate(count);
970 }
971 Ok(indices)
972 }
973
974 #[graphql(derived(name = "count"))]
975 async fn count_(&self) -> Result<u32, async_graphql::Error> {
976 Ok(self.count().await? as u32)
977 }
978 }
979
980 impl<C: Send + Sync, I: async_graphql::OutputType> async_graphql::TypeName for CustomSetView<C, I> {
981 fn type_name() -> Cow<'static, str> {
982 format!(
983 "CustomSetView_{}_{:08x}",
984 mangle(I::type_name()),
985 hash_name::<I>(),
986 )
987 .into()
988 }
989 }
990
991 #[async_graphql::Object(cache_control(no_cache), name_type)]
992 impl<C, I> CustomSetView<C, I>
993 where
994 C: Context,
995 I: Send + Sync + CustomSerialize + async_graphql::OutputType,
996 {
997 async fn elements(&self, count: Option<usize>) -> Result<Vec<I>, async_graphql::Error> {
998 let mut indices = self.indices().await?;
999 if let Some(count) = count {
1000 indices.truncate(count);
1001 }
1002 Ok(indices)
1003 }
1004
1005 #[graphql(derived(name = "count"))]
1006 async fn count_(&self) -> Result<u32, async_graphql::Error> {
1007 Ok(self.count().await? as u32)
1008 }
1009 }
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014 use super::*;
1015 use crate::{context::MemoryContext, store::WritableKeyValueStore as _};
1016
1017 #[tokio::test]
1018 async fn test_byte_set_view_flush_with_delete_storage_first_and_set_updates(
1019 ) -> Result<(), ViewError> {
1020 let context = MemoryContext::new_for_testing(());
1021 let mut set = ByteSetView::load(context).await?;
1022 assert!(!set.has_pending_changes().await);
1024
1025 set.insert(vec![1, 2, 3]);
1027 set.insert(vec![4, 5, 6]);
1028 assert!(set.has_pending_changes().await);
1030
1031 assert_eq!(set.keys().await?, vec![vec![1, 2, 3], vec![4, 5, 6]]);
1033
1034 let mut batch = Batch::new();
1035 set.flush(&mut batch)?;
1036 set.context().store().write_batch(batch).await?;
1037 assert!(!set.has_pending_changes().await);
1039
1040 assert_eq!(set.keys().await?, vec![vec![1, 2, 3], vec![4, 5, 6]]);
1042 assert_eq!(set.count().await?, 2);
1043
1044 set.clear();
1046 assert!(set.has_pending_changes().await);
1048
1049 assert!(set.keys().await?.is_empty());
1051
1052 set.insert(vec![7, 8, 9]);
1054 set.insert(vec![10, 11, 12]);
1055 assert!(set.has_pending_changes().await);
1057
1058 assert_eq!(set.keys().await?, vec![vec![7, 8, 9], vec![10, 11, 12]]);
1060
1061 let mut batch = Batch::new();
1063 let delete_view = set.flush(&mut batch)?;
1064 assert!(!delete_view);
1067 assert!(!batch.is_empty());
1069
1070 set.context().store().write_batch(batch).await?;
1072 assert!(!set.has_pending_changes().await);
1074
1075 let new_set = ByteSetView::load(set.context().clone()).await?;
1077 assert!(new_set.contains(&[7, 8, 9]).await?);
1078 assert!(new_set.contains(&[10, 11, 12]).await?);
1079 assert!(!new_set.contains(&[1, 2, 3]).await?);
1080 assert!(!new_set.contains(&[4, 5, 6]).await?);
1081 assert!(!new_set.has_pending_changes().await);
1083
1084 Ok(())
1085 }
1086
1087 #[tokio::test]
1088 async fn test_byte_set_view_flush_with_delete_storage_first_no_set_updates(
1089 ) -> Result<(), ViewError> {
1090 let context = MemoryContext::new_for_testing(());
1091 let mut set = ByteSetView::load(context).await?;
1092
1093 set.insert(vec![1, 2, 3]);
1095 let mut batch = Batch::new();
1096 set.flush(&mut batch)?;
1097 set.context().store().write_batch(batch).await?;
1098
1099 set.clear();
1101 let mut batch = Batch::new();
1102 let delete_view = set.flush(&mut batch)?;
1103
1104 assert!(delete_view);
1106
1107 Ok(())
1108 }
1109
1110 #[tokio::test]
1111 async fn test_byte_set_view_flush_with_delete_storage_first_mixed_updates(
1112 ) -> Result<(), ViewError> {
1113 let context = MemoryContext::new_for_testing(());
1114 let mut set = ByteSetView::load(context).await?;
1115
1116 set.insert(vec![1, 2, 3]);
1118 set.insert(vec![4, 5, 6]);
1119 let mut batch = Batch::new();
1120 set.flush(&mut batch)?;
1121 set.context().store().write_batch(batch).await?;
1122
1123 set.clear();
1125
1126 set.insert(vec![7, 8, 9]); set.remove(vec![10, 11, 12]); let mut batch = Batch::new();
1131 let delete_view = set.flush(&mut batch)?;
1132
1133 assert!(!delete_view);
1135
1136 Ok(())
1137 }
1138
1139 #[tokio::test]
1140 async fn test_has_pending_changes_comprehensive() -> Result<(), ViewError> {
1141 let context = MemoryContext::new_for_testing(());
1142 let mut set = ByteSetView::load(context).await?;
1143
1144 assert!(!set.has_pending_changes().await);
1146
1147 set.insert(vec![1]);
1149 assert!(set.has_pending_changes().await);
1150
1151 set.insert(vec![2]);
1153 set.insert(vec![3]);
1154 assert!(set.has_pending_changes().await);
1155
1156 let mut batch = Batch::new();
1158 set.flush(&mut batch)?;
1159 set.context().store().write_batch(batch).await?;
1160 assert!(!set.has_pending_changes().await);
1161
1162 set.remove(vec![1]);
1164 assert!(set.has_pending_changes().await);
1165
1166 set.clear();
1168 assert!(set.has_pending_changes().await);
1169
1170 set.insert(vec![4]);
1172 assert!(set.has_pending_changes().await);
1173
1174 set.rollback();
1176 assert!(!set.has_pending_changes().await);
1177
1178 assert!(set.contains(&[2]).await?);
1180 assert!(set.contains(&[3]).await?);
1181
1182 Ok(())
1183 }
1184
1185 #[tokio::test]
1186 async fn test_for_each_key_while_match_update_pattern() -> Result<(), ViewError> {
1187 let context = MemoryContext::new_for_testing(());
1188 let mut set = ByteSetView::load(context).await?;
1189
1190 set.insert(vec![1]);
1192 set.insert(vec![3]);
1193 set.insert(vec![5]);
1194 let mut batch = Batch::new();
1195 set.flush(&mut batch)?;
1196 set.context().store().write_batch(batch).await?;
1197
1198 set.insert(vec![2]); set.insert(vec![4]); let mut keys_processed = Vec::new();
1203
1204 set.for_each_key_while(|key| {
1207 keys_processed.push(key.to_vec());
1208 Ok(true) })
1210 .await?;
1211
1212 assert_eq!(
1214 keys_processed,
1215 vec![vec![1], vec![2], vec![3], vec![4], vec![5]]
1216 );
1217
1218 Ok(())
1219 }
1220
1221 #[tokio::test]
1222 async fn test_for_each_key_while_early_return() -> Result<(), ViewError> {
1223 let context = MemoryContext::new_for_testing(());
1224 let mut set = ByteSetView::load(context).await?;
1225
1226 set.insert(vec![1]);
1228 set.insert(vec![2]);
1229 set.insert(vec![3]);
1230 let mut batch = Batch::new();
1231 set.flush(&mut batch)?;
1232 set.context().store().write_batch(batch).await?;
1233
1234 let mut count = 0;
1235
1236 set.for_each_key_while(|_key| {
1238 count += 1;
1239 if count >= 2 {
1240 Ok(false) } else {
1242 Ok(true)
1243 }
1244 })
1245 .await?;
1246
1247 assert_eq!(count, 2);
1249
1250 Ok(())
1251 }
1252
1253 #[tokio::test]
1254 async fn test_hash_mut_delegation() -> Result<(), ViewError> {
1255 let context = MemoryContext::new_for_testing(());
1256 let mut set = ByteSetView::load(context).await?;
1257
1258 set.insert(vec![1, 2, 3]);
1260 set.insert(vec![4, 5, 6]);
1261
1262 let hash1 = set.hash_mut().await?;
1264 let hash2 = set.hash().await?;
1265
1266 assert_eq!(hash1, hash2);
1268
1269 set.insert(vec![7, 8, 9]);
1271 let hash3 = set.hash_mut().await?;
1272 assert_ne!(hash1, hash3);
1273
1274 Ok(())
1275 }
1276
1277 #[tokio::test]
1278 async fn test_for_each_key_while_early_return_on_update_set() -> Result<(), ViewError> {
1279 let context = MemoryContext::new_for_testing(());
1280 let mut set = ByteSetView::load(context).await?;
1281
1282 set.insert(vec![1]);
1284 set.insert(vec![3]);
1285 let mut batch = Batch::new();
1286 set.flush(&mut batch)?;
1287 set.context().store().write_batch(batch).await?;
1288
1289 set.insert(vec![0]); set.insert(vec![2]); let mut count = 0;
1294
1295 set.for_each_key_while(|key| {
1298 count += 1;
1299 if key == [0] {
1300 Ok(false) } else {
1302 Ok(true)
1303 }
1304 })
1305 .await?;
1306
1307 assert_eq!(count, 1);
1309
1310 Ok(())
1311 }
1312
1313 #[tokio::test]
1314 async fn test_for_each_key_while_early_return_in_remaining_updates() -> Result<(), ViewError> {
1315 let context = MemoryContext::new_for_testing(());
1316 let mut set = ByteSetView::load(context).await?;
1317
1318 set.insert(vec![1]);
1321 set.insert(vec![2]);
1322 set.insert(vec![3]);
1323
1324 let mut count = 0;
1325
1326 set.for_each_key_while(|key| {
1328 count += 1;
1329 if key == [2] {
1330 Ok(false) } else {
1332 Ok(true)
1333 }
1334 })
1335 .await?;
1336
1337 assert_eq!(count, 2);
1339
1340 Ok(())
1341 }
1342
1343 #[tokio::test]
1344 async fn test_contains_update_removed_returns_false() -> Result<(), ViewError> {
1345 let context = MemoryContext::new_for_testing(());
1346 let mut set = ByteSetView::load(context).await?;
1347
1348 assert!(!set.has_pending_changes().await);
1350
1351 set.insert(vec![1, 2, 3]);
1353
1354 assert!(set.has_pending_changes().await);
1356
1357 assert_eq!(set.keys().await?, vec![vec![1, 2, 3]]);
1359
1360 let mut batch = Batch::new();
1361 set.flush(&mut batch)?;
1362 set.context().store().write_batch(batch).await?;
1363
1364 assert!(!set.has_pending_changes().await);
1366
1367 assert_eq!(set.keys().await?, vec![vec![1, 2, 3]]);
1369
1370 assert!(set.contains(&[1, 2, 3]).await?);
1372
1373 set.remove(vec![1, 2, 3]);
1375
1376 assert!(set.has_pending_changes().await);
1378
1379 assert!(set.keys().await?.is_empty());
1381
1382 assert!(!set.contains(&[1, 2, 3]).await?);
1385
1386 Ok(())
1387 }
1388
1389 #[tokio::test]
1390 async fn test_contains_delete_storage_first_returns_false() -> Result<(), ViewError> {
1391 let context = MemoryContext::new_for_testing(());
1392 let mut set = ByteSetView::load(context).await?;
1393
1394 set.insert(vec![1]);
1396 set.insert(vec![2]);
1397 set.insert(vec![3]);
1398 let mut batch = Batch::new();
1399 set.flush(&mut batch)?;
1400 set.context().store().write_batch(batch).await?;
1401
1402 assert!(set.contains(&[1]).await?);
1404 assert!(set.contains(&[2]).await?);
1405 assert!(set.contains(&[3]).await?);
1406
1407 set.clear();
1409
1410 assert!(!set.contains(&[1]).await?);
1413 assert!(!set.contains(&[2]).await?);
1414 assert!(!set.contains(&[3]).await?);
1415
1416 assert!(!set.contains(&[99]).await?);
1418
1419 Ok(())
1420 }
1421
1422 #[tokio::test]
1423 async fn test_contains_delete_storage_first_with_new_additions() -> Result<(), ViewError> {
1424 let context = MemoryContext::new_for_testing(());
1425 let mut set = ByteSetView::load(context).await?;
1426
1427 set.insert(vec![1]);
1429 set.insert(vec![2]);
1430 let mut batch = Batch::new();
1431 set.flush(&mut batch)?;
1432 set.context().store().write_batch(batch).await?;
1433
1434 set.clear();
1436
1437 set.insert(vec![3]);
1439 set.insert(vec![4]);
1440
1441 assert!(!set.contains(&[1]).await?); assert!(!set.contains(&[2]).await?); assert!(set.contains(&[3]).await?); assert!(set.contains(&[4]).await?); Ok(())
1449 }
1450
1451 #[tokio::test]
1452 async fn test_for_each_key_while_update_set_processing_in_stored_loop() -> Result<(), ViewError>
1453 {
1454 let context = MemoryContext::new_for_testing(());
1455 let mut set = ByteSetView::load(context).await?;
1456
1457 set.insert(vec![2]);
1459 set.insert(vec![4]);
1460 set.insert(vec![6]);
1461 let mut batch = Batch::new();
1462 set.flush(&mut batch)?;
1463 set.context().store().write_batch(batch).await?;
1464
1465 set.insert(vec![1]); set.insert(vec![3]); set.remove(vec![5]); let mut processed_keys = Vec::new();
1471
1472 set.for_each_key_while(|key| {
1475 processed_keys.push(key.to_vec());
1476 Ok(true)
1477 })
1478 .await?;
1479
1480 assert_eq!(
1483 processed_keys,
1484 vec![vec![1], vec![2], vec![3], vec![4], vec![6]]
1485 );
1486
1487 Ok(())
1488 }
1489
1490 #[tokio::test]
1491 async fn test_set_view_flush_with_delete_storage_first_and_set_updates() -> Result<(), ViewError>
1492 {
1493 let context = MemoryContext::new_for_testing(());
1494 let mut set = SetView::<_, u32>::load(context).await?;
1495
1496 set.insert(&42)?;
1498 set.insert(&84)?;
1499 let mut batch = Batch::new();
1500 set.flush(&mut batch)?;
1501 set.context().store().write_batch(batch).await?;
1502
1503 set.clear();
1505
1506 set.insert(&123)?;
1508 set.insert(&456)?;
1509
1510 let mut batch = Batch::new();
1511 let delete_view = set.flush(&mut batch)?;
1512
1513 assert!(!delete_view);
1515
1516 set.context().store().write_batch(batch).await?;
1518 let new_set = SetView::<_, u32>::load(set.context().clone()).await?;
1519 assert!(new_set.contains(&123).await?);
1520 assert!(new_set.contains(&456).await?);
1521 assert!(!new_set.contains(&42).await?);
1522 assert!(!new_set.contains(&84).await?);
1523
1524 Ok(())
1525 }
1526
1527 #[tokio::test]
1528 async fn test_set_view_count_delegation() -> Result<(), ViewError> {
1529 let context = MemoryContext::new_for_testing(());
1530 let mut set = SetView::<_, u32>::load(context).await?;
1531
1532 assert!(!set.has_pending_changes().await);
1534
1535 assert_eq!(set.count().await?, 0);
1537
1538 set.insert(&42)?;
1540
1541 assert!(set.has_pending_changes().await);
1543
1544 assert_eq!(set.indices().await?, vec![42]);
1546
1547 set.insert(&84)?;
1548 set.insert(&126)?;
1549
1550 assert!(set.has_pending_changes().await);
1552
1553 assert_eq!(set.indices().await?, vec![42, 84, 126]);
1555
1556 assert_eq!(set.count().await?, 3);
1558
1559 Ok(())
1560 }
1561
1562 #[tokio::test]
1563 async fn test_set_view_hash_mut_delegation() -> Result<(), ViewError> {
1564 let context = MemoryContext::new_for_testing(());
1565 let mut set = SetView::<_, u32>::load(context).await?;
1566
1567 set.insert(&42)?;
1569 set.insert(&84)?;
1570 set.insert(&126)?;
1571
1572 let hash1 = set.hash_mut().await?;
1574 let hash2 = set.hash().await?;
1575
1576 assert_eq!(hash1, hash2);
1578
1579 set.insert(&168)?;
1581 let hash3 = set.hash_mut().await?;
1582 assert_ne!(hash1, hash3);
1583
1584 let context2 = MemoryContext::new_for_testing(());
1586 let mut byte_set = ByteSetView::load(context2).await?;
1587
1588 use crate::context::BaseKey;
1590 byte_set.insert(BaseKey::derive_short_key(&42u32)?);
1591 byte_set.insert(BaseKey::derive_short_key(&84u32)?);
1592 byte_set.insert(BaseKey::derive_short_key(&126u32)?);
1593 byte_set.insert(BaseKey::derive_short_key(&168u32)?);
1594
1595 let byte_set_hash = byte_set.hash_mut().await?;
1596 assert_eq!(hash3, byte_set_hash);
1597
1598 Ok(())
1599 }
1600
1601 #[tokio::test]
1603 async fn test_custom_set_view_flush_with_delete_storage_first_and_set_updates(
1604 ) -> Result<(), ViewError> {
1605 let context = MemoryContext::new_for_testing(());
1606 let mut set = CustomSetView::<_, u128>::load(context).await?;
1607
1608 assert!(!set.has_pending_changes().await);
1610
1611 set.insert(&42u128)?;
1613 set.insert(&84u128)?;
1614
1615 assert!(set.has_pending_changes().await);
1617
1618 assert_eq!(set.indices().await?, vec![42u128, 84u128]);
1620
1621 let mut batch = Batch::new();
1622 set.flush(&mut batch)?;
1623 set.context().store().write_batch(batch).await?;
1624
1625 assert!(!set.has_pending_changes().await);
1627
1628 assert_eq!(set.indices().await?, vec![42u128, 84u128]);
1630
1631 set.clear();
1633
1634 assert!(set.has_pending_changes().await);
1636
1637 assert!(set.indices().await?.is_empty());
1639
1640 set.insert(&123u128)?;
1642 set.insert(&456u128)?;
1643
1644 assert!(set.has_pending_changes().await);
1646
1647 assert_eq!(set.indices().await?, vec![123u128, 456u128]);
1649
1650 let mut batch = Batch::new();
1651 let delete_view = set.flush(&mut batch)?;
1652
1653 assert!(!delete_view);
1655
1656 set.context().store().write_batch(batch).await?;
1658
1659 assert!(!set.has_pending_changes().await);
1661
1662 let new_set = CustomSetView::<_, u128>::load(set.context().clone()).await?;
1663 assert!(new_set.contains(&123u128).await?);
1664 assert!(new_set.contains(&456u128).await?);
1665 assert!(!new_set.contains(&42u128).await?);
1666 assert!(!new_set.contains(&84u128).await?);
1667
1668 assert!(!new_set.has_pending_changes().await);
1670
1671 Ok(())
1672 }
1673
1674 #[tokio::test]
1675 async fn test_custom_set_view_contains_update_removed_returns_false() -> Result<(), ViewError> {
1676 let context = MemoryContext::new_for_testing(());
1677 let mut set = CustomSetView::<_, u128>::load(context).await?;
1678
1679 set.insert(&12345u128)?;
1681 let mut batch = Batch::new();
1682 set.flush(&mut batch)?;
1683 set.context().store().write_batch(batch).await?;
1684
1685 assert!(set.contains(&12345u128).await?);
1687
1688 set.remove(&12345u128)?;
1690
1691 assert!(!set.contains(&12345u128).await?);
1693
1694 Ok(())
1695 }
1696
1697 #[tokio::test]
1698 async fn test_custom_set_view_contains_delete_storage_first_returns_false(
1699 ) -> Result<(), ViewError> {
1700 let context = MemoryContext::new_for_testing(());
1701 let mut set = CustomSetView::<_, u128>::load(context).await?;
1702
1703 set.insert(&111u128)?;
1705 set.insert(&222u128)?;
1706 set.insert(&333u128)?;
1707 let mut batch = Batch::new();
1708 set.flush(&mut batch)?;
1709 set.context().store().write_batch(batch).await?;
1710
1711 assert!(set.contains(&111u128).await?);
1713 assert!(set.contains(&222u128).await?);
1714
1715 set.clear();
1717
1718 assert!(!set.contains(&111u128).await?);
1720 assert!(!set.contains(&222u128).await?);
1721 assert!(!set.contains(&333u128).await?);
1722
1723 Ok(())
1724 }
1725
1726 #[tokio::test]
1727 async fn test_custom_set_view_hash_mut_delegation() -> Result<(), ViewError> {
1728 let context = MemoryContext::new_for_testing(());
1729 let mut set = CustomSetView::<_, u128>::load(context).await?;
1730
1731 set.insert(&1000u128)?;
1733 set.insert(&2000u128)?;
1734
1735 let hash1 = set.hash_mut().await?;
1737 let hash2 = set.hash().await?;
1738
1739 assert_eq!(hash1, hash2);
1741
1742 set.insert(&3000u128)?;
1744 let hash3 = set.hash_mut().await?;
1745 assert_ne!(hash1, hash3);
1746
1747 Ok(())
1748 }
1749
1750 #[tokio::test]
1751 async fn test_custom_set_view_for_each_index_while_method_signature() -> Result<(), ViewError> {
1752 let context = MemoryContext::new_for_testing(());
1753 let mut set = CustomSetView::<_, u128>::load(context).await?;
1754 assert_eq!(set.count().await?, 0);
1755
1756 set.insert(&100u128)?;
1758 set.insert(&200u128)?;
1759 set.insert(&300u128)?;
1760
1761 assert_eq!(set.count().await?, 3);
1762
1763 let mut collected_indices = Vec::new();
1764
1765 set.for_each_index_while(|index| {
1768 collected_indices.push(index);
1769 Ok(true)
1770 })
1771 .await?;
1772
1773 assert_eq!(collected_indices, vec![100u128, 200u128, 300u128]);
1775
1776 Ok(())
1777 }
1778
1779 #[tokio::test]
1780 async fn test_custom_set_view_rollback() -> Result<(), ViewError> {
1781 let context = MemoryContext::new_for_testing(());
1782 let mut set = CustomSetView::<_, u128>::load(context).await?;
1783
1784 set.insert(&100u128)?;
1786 set.insert(&200u128)?;
1787 let mut batch = Batch::new();
1788 set.flush(&mut batch)?;
1789 set.context().store().write_batch(batch).await?;
1790
1791 assert!(set.contains(&100u128).await?);
1793 assert!(set.contains(&200u128).await?);
1794 assert!(!set.has_pending_changes().await);
1795
1796 set.insert(&300u128)?;
1798 set.remove(&100u128)?;
1799 assert!(set.has_pending_changes().await);
1800
1801 assert!(set.contains(&300u128).await?);
1803 assert!(!set.contains(&100u128).await?);
1804
1805 set.rollback();
1807
1808 assert!(!set.has_pending_changes().await);
1810 assert!(set.contains(&100u128).await?);
1811 assert!(set.contains(&200u128).await?);
1812 assert!(!set.contains(&300u128).await?);
1813
1814 Ok(())
1815 }
1816
1817 #[tokio::test]
1818 async fn test_custom_set_view_clone_unchecked() -> Result<(), ViewError> {
1819 let context = MemoryContext::new_for_testing(());
1820 let mut set = CustomSetView::<_, u128>::load(context).await?;
1821
1822 set.insert(&42u128)?;
1824 set.insert(&84u128)?;
1825
1826 let mut cloned_set = set.clone_unchecked()?;
1828
1829 assert!(cloned_set.contains(&42u128).await?);
1831 assert!(cloned_set.contains(&84u128).await?);
1832
1833 cloned_set.insert(&126u128)?;
1835 assert!(cloned_set.contains(&126u128).await?);
1836 assert!(!set.contains(&126u128).await?);
1837
1838 set.insert(&168u128)?;
1840 assert!(set.contains(&168u128).await?);
1841 assert!(!cloned_set.contains(&168u128).await?);
1842
1843 Ok(())
1844 }
1845
1846 #[cfg(with_graphql)]
1847 mod graphql_tests {
1848 use async_graphql::{EmptyMutation, EmptySubscription, Object, Schema};
1849
1850 use super::*;
1851
1852 struct Query;
1854
1855 #[Object]
1856 impl Query {
1857 async fn test_set(&self) -> TestSetView {
1858 let context = MemoryContext::new_for_testing(());
1859 let mut set = SetView::<_, u32>::load(context).await.unwrap();
1860
1861 set.insert(&42).unwrap();
1863 set.insert(&84).unwrap();
1864 set.insert(&126).unwrap();
1865 set.insert(&168).unwrap();
1866 set.insert(&210).unwrap();
1867
1868 TestSetView { set }
1869 }
1870 }
1871
1872 struct TestSetView {
1873 set: SetView<MemoryContext<()>, u32>,
1874 }
1875
1876 #[Object]
1877 impl TestSetView {
1878 async fn elements(
1879 &self,
1880 count: Option<usize>,
1881 ) -> Result<Vec<u32>, async_graphql::Error> {
1882 let mut indices = self.set.indices().await?;
1884 if let Some(count) = count {
1885 indices.truncate(count);
1887 }
1888 Ok(indices)
1889 }
1890
1891 async fn count(&self) -> Result<u32, async_graphql::Error> {
1892 Ok(self.set.count().await? as u32)
1893 }
1894 }
1895
1896 #[tokio::test]
1897 async fn test_graphql_elements_without_count() -> Result<(), Box<dyn std::error::Error>> {
1898 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
1899
1900 let query = r#"
1902 query {
1903 testSet {
1904 elements
1905 }
1906 }
1907 "#;
1908
1909 let result = schema.execute(query).await;
1910 assert!(result.errors.is_empty());
1911
1912 let data = result.data.into_json()?;
1913 let elements = &data["testSet"]["elements"];
1914 assert!(elements.is_array());
1915 assert_eq!(elements.as_array().unwrap().len(), 5);
1916
1917 Ok(())
1918 }
1919
1920 #[tokio::test]
1921 async fn test_graphql_elements_with_count() -> Result<(), Box<dyn std::error::Error>> {
1922 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
1923
1924 let query = r#"
1926 query {
1927 testSet {
1928 elements(count: 3)
1929 }
1930 }
1931 "#;
1932
1933 let result = schema.execute(query).await;
1934 assert!(result.errors.is_empty());
1935
1936 let data = result.data.into_json()?;
1937 let elements = &data["testSet"]["elements"];
1938 assert!(elements.is_array());
1939 assert_eq!(elements.as_array().unwrap().len(), 3);
1941
1942 Ok(())
1943 }
1944
1945 #[tokio::test]
1946 async fn test_graphql_count_field() -> Result<(), Box<dyn std::error::Error>> {
1947 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
1948
1949 let query = r#"
1950 query {
1951 testSet {
1952 count
1953 }
1954 }
1955 "#;
1956
1957 let result = schema.execute(query).await;
1958 assert!(result.errors.is_empty());
1959
1960 let data = result.data.into_json()?;
1961 let count = &data["testSet"]["count"];
1962 assert_eq!(count.as_u64().unwrap(), 5);
1963
1964 Ok(())
1965 }
1966 }
1967}