1use std::{borrow::Borrow, collections::BTreeMap, marker::PhantomData};
5
6use allocative::Allocative;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use serde::{de::DeserializeOwned, Serialize};
10
11use crate::{
12 batch::Batch,
13 common::{CustomSerialize, HasherOutput, Update},
14 context::{BaseKey, Context},
15 hashable_wrapper::WrappedHashableContainerView,
16 historical_hash_wrapper::HistoricallyHashableView,
17 store::ReadableKeyValueStore as _,
18 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError},
19};
20
21#[cfg(with_metrics)]
22mod metrics {
23 use std::sync::LazyLock;
24
25 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
26 use prometheus::HistogramVec;
27
28 pub static SET_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
30 register_histogram_vec(
31 "set_view_hash_runtime",
32 "SetView hash runtime",
33 &[],
34 exponential_bucket_latencies(5.0),
35 )
36 });
37}
38
39#[derive(Debug, Allocative)]
41#[allocative(bound = "C")]
42pub struct ByteSetView<C> {
43 #[allocative(skip)]
45 context: C,
46 delete_storage_first: bool,
48 updates: BTreeMap<Vec<u8>, Update<()>>,
50}
51
52impl<C: Context, C2: Context> ReplaceContext<C2> for ByteSetView<C> {
53 type Target = ByteSetView<C2>;
54
55 async fn with_context(
56 &mut self,
57 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
58 ) -> Self::Target {
59 ByteSetView {
60 context: ctx(&self.context),
61 delete_storage_first: self.delete_storage_first,
62 updates: self.updates.clone(),
63 }
64 }
65}
66
67impl<C: Context> View for ByteSetView<C> {
68 const NUM_INIT_KEYS: usize = 0;
69
70 type Context = C;
71
72 fn context(&self) -> C {
73 self.context.clone()
74 }
75
76 fn pre_load(_context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
77 Ok(Vec::new())
78 }
79
80 fn post_load(context: C, _values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
81 Ok(Self {
82 context,
83 delete_storage_first: false,
84 updates: BTreeMap::new(),
85 })
86 }
87
88 fn rollback(&mut self) {
89 self.delete_storage_first = false;
90 self.updates.clear();
91 }
92
93 async fn has_pending_changes(&self) -> bool {
94 if self.delete_storage_first {
95 return true;
96 }
97 !self.updates.is_empty()
98 }
99
100 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
101 let mut delete_view = false;
102 if self.delete_storage_first {
103 delete_view = true;
104 batch.delete_key_prefix(self.context.base_key().bytes.clone());
105 for (index, update) in self.updates.iter() {
106 if let Update::Set(_) = update {
107 let key = self.context.base_key().base_index(index);
108 batch.put_key_value_bytes(key, Vec::new());
109 delete_view = false;
110 }
111 }
112 } else {
113 for (index, update) in self.updates.iter() {
114 let key = self.context.base_key().base_index(index);
115 match update {
116 Update::Removed => batch.delete_key(key),
117 Update::Set(_) => batch.put_key_value_bytes(key, Vec::new()),
118 }
119 }
120 }
121 Ok(delete_view)
122 }
123
124 fn post_save(&mut self) {
125 self.delete_storage_first = false;
126 self.updates.clear();
127 }
128
129 fn clear(&mut self) {
130 self.delete_storage_first = true;
131 self.updates.clear();
132 }
133}
134
135impl<C: Context> ClonableView for ByteSetView<C> {
136 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
137 Ok(ByteSetView {
138 context: self.context.clone(),
139 delete_storage_first: self.delete_storage_first,
140 updates: self.updates.clone(),
141 })
142 }
143}
144
145impl<C: Context> ByteSetView<C> {
146 pub fn insert(&mut self, short_key: Vec<u8>) {
158 self.updates.insert(short_key, Update::Set(()));
159 }
160
161 pub fn remove(&mut self, short_key: Vec<u8>) {
173 if self.delete_storage_first {
174 self.updates.remove(&short_key);
176 } else {
177 self.updates.insert(short_key, Update::Removed);
178 }
179 }
180
181 pub fn extra(&self) -> &C::Extra {
183 self.context.extra()
184 }
185}
186
187impl<C: Context> ByteSetView<C> {
188 pub async fn contains(&self, short_key: &[u8]) -> Result<bool, ViewError> {
201 if let Some(update) = self.updates.get(short_key) {
202 let value = match update {
203 Update::Removed => false,
204 Update::Set(()) => true,
205 };
206 return Ok(value);
207 }
208 if self.delete_storage_first {
209 return Ok(false);
210 }
211 let key = self.context.base_key().base_index(short_key);
212 Ok(self.context.store().contains_key(&key).await?)
213 }
214}
215
216impl<C: Context> ByteSetView<C> {
217 pub async fn keys(&self) -> Result<Vec<Vec<u8>>, ViewError> {
230 let mut keys = Vec::new();
231 self.for_each_key(|key| {
232 keys.push(key.to_vec());
233 Ok(())
234 })
235 .await?;
236 Ok(keys)
237 }
238
239 pub async fn iterative_count(&self) -> Result<usize, ViewError> {
252 let mut count = 0;
253 self.for_each_key(|_key| {
254 count += 1;
255 Ok(())
256 })
257 .await?;
258 Ok(count)
259 }
260
261 pub async fn for_each_key_while<F>(&self, mut f: F) -> Result<(), ViewError>
284 where
285 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
286 {
287 let mut updates = self.updates.iter();
288 let mut update = updates.next();
289 if !self.delete_storage_first {
290 let base = &self.context.base_key().bytes;
291 for index in self.context.store().find_keys_by_prefix(base).await? {
292 loop {
293 match update {
294 Some((key, value)) if key <= &index => {
295 if let Update::Set(_) = value {
296 if !f(key)? {
297 return Ok(());
298 }
299 }
300 update = updates.next();
301 if key == &index {
302 break;
303 }
304 }
305 _ => {
306 if !f(&index)? {
307 return Ok(());
308 }
309 break;
310 }
311 }
312 }
313 }
314 }
315 while let Some((key, value)) = update {
316 if let Update::Set(_) = value {
317 if !f(key)? {
318 return Ok(());
319 }
320 }
321 update = updates.next();
322 }
323 Ok(())
324 }
325
326 pub async fn for_each_key<F>(&self, mut f: F) -> Result<(), ViewError>
348 where
349 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
350 {
351 self.for_each_key_while(|key| {
352 f(key)?;
353 Ok(true)
354 })
355 .await
356 }
357}
358
359impl<C: Context> HashableView for ByteSetView<C> {
360 type Hasher = sha3::Sha3_256;
361
362 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
363 self.hash().await
364 }
365
366 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
367 #[cfg(with_metrics)]
368 let _hash_latency = metrics::SET_VIEW_HASH_RUNTIME.measure_latency();
369 let mut hasher = sha3::Sha3_256::default();
370 let mut count = 0u32;
371 self.for_each_key(|key| {
372 count += 1;
373 hasher.update_with_bytes(key)?;
374 Ok(())
375 })
376 .await?;
377 hasher.update_with_bcs_bytes(&count)?;
378 Ok(hasher.finalize())
379 }
380}
381
382#[derive(Debug, Allocative)]
384#[allocative(bound = "C, I")]
385pub struct SetView<C, I> {
386 set: ByteSetView<C>,
388 #[allocative(skip)]
390 _phantom: PhantomData<I>,
391}
392
393impl<C: Context, I: Send + Sync + Serialize, C2: Context> ReplaceContext<C2> for SetView<C, I> {
394 type Target = SetView<C2, I>;
395
396 async fn with_context(
397 &mut self,
398 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
399 ) -> Self::Target {
400 SetView {
401 set: self.set.with_context(ctx).await,
402 _phantom: self._phantom,
403 }
404 }
405}
406
407impl<C: Context, I: Send + Sync + Serialize> View for SetView<C, I> {
408 const NUM_INIT_KEYS: usize = ByteSetView::<C>::NUM_INIT_KEYS;
409
410 type Context = C;
411
412 fn context(&self) -> C {
413 self.set.context()
414 }
415
416 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
417 ByteSetView::<C>::pre_load(context)
418 }
419
420 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
421 let set = ByteSetView::post_load(context, values)?;
422 Ok(Self {
423 set,
424 _phantom: PhantomData,
425 })
426 }
427
428 fn rollback(&mut self) {
429 self.set.rollback()
430 }
431
432 async fn has_pending_changes(&self) -> bool {
433 self.set.has_pending_changes().await
434 }
435
436 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
437 self.set.pre_save(batch)
438 }
439
440 fn post_save(&mut self) {
441 self.set.post_save()
442 }
443
444 fn clear(&mut self) {
445 self.set.clear()
446 }
447}
448
449impl<C, I> ClonableView for SetView<C, I>
450where
451 C: Context,
452 I: Send + Sync + Serialize,
453{
454 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
455 Ok(SetView {
456 set: self.set.clone_unchecked()?,
457 _phantom: PhantomData,
458 })
459 }
460}
461
462impl<C: Context, I: Serialize> SetView<C, I> {
463 pub fn insert<Q>(&mut self, index: &Q) -> Result<(), ViewError>
476 where
477 I: Borrow<Q>,
478 Q: Serialize + ?Sized,
479 {
480 let short_key = BaseKey::derive_short_key(index)?;
481 self.set.insert(short_key);
482 Ok(())
483 }
484
485 pub fn remove<Q>(&mut self, index: &Q) -> Result<(), ViewError>
497 where
498 I: Borrow<Q>,
499 Q: Serialize + ?Sized,
500 {
501 let short_key = BaseKey::derive_short_key(index)?;
502 self.set.remove(short_key);
503 Ok(())
504 }
505
506 pub fn extra(&self) -> &C::Extra {
508 self.set.extra()
509 }
510}
511
512impl<C: Context, I: Serialize> SetView<C, I> {
513 pub async fn contains<Q>(&self, index: &Q) -> Result<bool, ViewError>
526 where
527 I: Borrow<Q>,
528 Q: Serialize + ?Sized,
529 {
530 let short_key = BaseKey::derive_short_key(index)?;
531 self.set.contains(&short_key).await
532 }
533}
534
535impl<C: Context, I: Serialize + DeserializeOwned + Send> SetView<C, I> {
536 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
548 let mut indices = Vec::new();
549 self.for_each_index(|index| {
550 indices.push(index);
551 Ok(())
552 })
553 .await?;
554 Ok(indices)
555 }
556
557 pub async fn iterative_count(&self) -> Result<usize, ViewError> {
569 self.set.iterative_count().await
570 }
571
572 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
596 where
597 F: FnMut(I) -> Result<bool, ViewError> + Send,
598 {
599 self.set
600 .for_each_key_while(|key| {
601 let index = BaseKey::deserialize_value(key)?;
602 f(index)
603 })
604 .await?;
605 Ok(())
606 }
607
608 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
631 where
632 F: FnMut(I) -> Result<(), ViewError> + Send,
633 {
634 self.set
635 .for_each_key(|key| {
636 let index = BaseKey::deserialize_value(key)?;
637 f(index)
638 })
639 .await?;
640 Ok(())
641 }
642}
643
644impl<C, I> HashableView for SetView<C, I>
645where
646 Self: View,
647 ByteSetView<C>: HashableView,
648{
649 type Hasher = <ByteSetView<C> as HashableView>::Hasher;
650
651 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
652 self.set.hash_mut().await
653 }
654
655 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
656 self.set.hash().await
657 }
658}
659
660#[derive(Debug, Allocative)]
663#[allocative(bound = "C, I")]
664pub struct CustomSetView<C, I> {
665 set: ByteSetView<C>,
667 #[allocative(skip)]
669 _phantom: PhantomData<I>,
670}
671
672impl<C, I> View for CustomSetView<C, I>
673where
674 C: Context,
675 I: Send + Sync + CustomSerialize,
676{
677 const NUM_INIT_KEYS: usize = ByteSetView::<C>::NUM_INIT_KEYS;
678
679 type Context = C;
680
681 fn context(&self) -> C {
682 self.set.context()
683 }
684
685 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
686 ByteSetView::pre_load(context)
687 }
688
689 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
690 let set = ByteSetView::post_load(context, values)?;
691 Ok(Self {
692 set,
693 _phantom: PhantomData,
694 })
695 }
696
697 fn rollback(&mut self) {
698 self.set.rollback()
699 }
700
701 async fn has_pending_changes(&self) -> bool {
702 self.set.has_pending_changes().await
703 }
704
705 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
706 self.set.pre_save(batch)
707 }
708
709 fn post_save(&mut self) {
710 self.set.post_save()
711 }
712
713 fn clear(&mut self) {
714 self.set.clear()
715 }
716}
717
718impl<C, I> ClonableView for CustomSetView<C, I>
719where
720 C: Context,
721 I: Send + Sync + CustomSerialize,
722{
723 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
724 Ok(CustomSetView {
725 set: self.set.clone_unchecked()?,
726 _phantom: PhantomData,
727 })
728 }
729}
730
731impl<C: Context, I: CustomSerialize> CustomSetView<C, I> {
732 pub fn insert<Q>(&mut self, index: &Q) -> Result<(), ViewError>
745 where
746 I: Borrow<Q>,
747 Q: CustomSerialize,
748 {
749 let short_key = index.to_custom_bytes()?;
750 self.set.insert(short_key);
751 Ok(())
752 }
753
754 pub fn remove<Q>(&mut self, index: &Q) -> Result<(), ViewError>
767 where
768 I: Borrow<Q>,
769 Q: CustomSerialize,
770 {
771 let short_key = index.to_custom_bytes()?;
772 self.set.remove(short_key);
773 Ok(())
774 }
775
776 pub fn extra(&self) -> &C::Extra {
778 self.set.extra()
779 }
780}
781
782impl<C, I> CustomSetView<C, I>
783where
784 C: Context,
785 I: CustomSerialize,
786{
787 pub async fn contains<Q>(&self, index: &Q) -> Result<bool, ViewError>
801 where
802 I: Borrow<Q>,
803 Q: CustomSerialize,
804 {
805 let short_key = index.to_custom_bytes()?;
806 self.set.contains(&short_key).await
807 }
808}
809
810impl<C, I> CustomSetView<C, I>
811where
812 C: Context,
813 I: Sync + Send + CustomSerialize,
814{
815 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
830 let mut indices = Vec::new();
831 self.for_each_index(|index| {
832 indices.push(index);
833 Ok(())
834 })
835 .await?;
836 Ok(indices)
837 }
838
839 pub async fn iterative_count(&self) -> Result<usize, ViewError> {
853 self.set.iterative_count().await
854 }
855
856 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
880 where
881 F: FnMut(I) -> Result<bool, ViewError> + Send,
882 {
883 self.set
884 .for_each_key_while(|key| {
885 let index = I::from_custom_bytes(key)?;
886 f(index)
887 })
888 .await?;
889 Ok(())
890 }
891
892 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
915 where
916 F: FnMut(I) -> Result<(), ViewError> + Send,
917 {
918 self.set
919 .for_each_key(|key| {
920 let index = I::from_custom_bytes(key)?;
921 f(index)
922 })
923 .await?;
924 Ok(())
925 }
926}
927
928impl<C: Context, I> HashableView for CustomSetView<C, I>
929where
930 Self: View,
931{
932 type Hasher = sha3::Sha3_256;
933
934 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
935 self.set.hash_mut().await
936 }
937
938 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
939 self.set.hash().await
940 }
941}
942
943pub type HashedByteSetView<C> = WrappedHashableContainerView<C, ByteSetView<C>, HasherOutput>;
945
946pub type HistoricallyHashedByteSetView<C> = HistoricallyHashableView<C, ByteSetView<C>>;
948
949pub type HashedSetView<C, I> = WrappedHashableContainerView<C, SetView<C, I>, HasherOutput>;
951
952pub type HistoricallyHashedSetView<C, I> = HistoricallyHashableView<C, SetView<C, I>>;
954
955pub type HashedCustomSetView<C, I> =
957 WrappedHashableContainerView<C, CustomSetView<C, I>, HasherOutput>;
958
959pub type HistoricallyHashedCustomSetView<C, I> = HistoricallyHashableView<C, CustomSetView<C, I>>;
961
962#[cfg(with_graphql)]
963mod graphql {
964 use std::borrow::Cow;
965
966 use serde::{de::DeserializeOwned, Serialize};
967
968 use super::{CustomSetView, SetView};
969 use crate::{
970 common::CustomSerialize,
971 context::Context,
972 graphql::{hash_name, mangle},
973 };
974
975 impl<C: Send + Sync, I: async_graphql::OutputType> async_graphql::TypeName for SetView<C, I> {
976 fn type_name() -> Cow<'static, str> {
977 format!(
978 "SetView_{}_{:08x}",
979 mangle(I::type_name()),
980 hash_name::<I>(),
981 )
982 .into()
983 }
984 }
985
986 #[async_graphql::Object(cache_control(no_cache), name_type)]
987 impl<C, I> SetView<C, I>
988 where
989 C: Context,
990 I: Send + Sync + Serialize + DeserializeOwned + async_graphql::OutputType,
991 {
992 async fn elements(&self, count: Option<usize>) -> Result<Vec<I>, async_graphql::Error> {
993 let mut indices = self.indices().await?;
994 if let Some(count) = count {
995 indices.truncate(count);
996 }
997 Ok(indices)
998 }
999
1000 #[graphql(derived(name = "count"))]
1001 async fn count_(&self) -> Result<u32, async_graphql::Error> {
1002 let count = self.iterative_count().await?;
1003 u32::try_from(count).map_err(|_| async_graphql::Error::new("count exceeds u32"))
1004 }
1005 }
1006
1007 impl<C: Send + Sync, I: async_graphql::OutputType> async_graphql::TypeName for CustomSetView<C, I> {
1008 fn type_name() -> Cow<'static, str> {
1009 format!(
1010 "CustomSetView_{}_{:08x}",
1011 mangle(I::type_name()),
1012 hash_name::<I>(),
1013 )
1014 .into()
1015 }
1016 }
1017
1018 #[async_graphql::Object(cache_control(no_cache), name_type)]
1019 impl<C, I> CustomSetView<C, I>
1020 where
1021 C: Context,
1022 I: Send + Sync + CustomSerialize + async_graphql::OutputType,
1023 {
1024 async fn elements(&self, count: Option<usize>) -> Result<Vec<I>, async_graphql::Error> {
1025 let mut indices = self.indices().await?;
1026 if let Some(count) = count {
1027 indices.truncate(count);
1028 }
1029 Ok(indices)
1030 }
1031
1032 #[graphql(derived(name = "count"))]
1033 async fn count_(&self) -> Result<u32, async_graphql::Error> {
1034 let count = self.iterative_count().await?;
1035 u32::try_from(count).map_err(|_| async_graphql::Error::new("count exceeds u32"))
1036 }
1037 }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042 use super::*;
1043 use crate::{context::MemoryContext, store::WritableKeyValueStore as _};
1044
1045 #[tokio::test]
1046 async fn test_byte_set_view_flush_with_delete_storage_first_and_set_updates(
1047 ) -> Result<(), ViewError> {
1048 let context = MemoryContext::new_for_testing(());
1049 let mut set = ByteSetView::load(context).await?;
1050 assert!(!set.has_pending_changes().await);
1052
1053 set.insert(vec![1, 2, 3]);
1055 set.insert(vec![4, 5, 6]);
1056 assert!(set.has_pending_changes().await);
1058
1059 assert_eq!(set.keys().await?, vec![vec![1, 2, 3], vec![4, 5, 6]]);
1061
1062 let mut batch = Batch::new();
1063 set.pre_save(&mut batch)?;
1064 set.context().store().write_batch(batch).await?;
1065 set.post_save();
1066 assert!(!set.has_pending_changes().await);
1068
1069 assert_eq!(set.keys().await?, vec![vec![1, 2, 3], vec![4, 5, 6]]);
1071 assert_eq!(set.iterative_count().await?, 2);
1072
1073 set.clear();
1075 assert!(set.has_pending_changes().await);
1077
1078 assert!(set.keys().await?.is_empty());
1080
1081 set.insert(vec![7, 8, 9]);
1083 set.insert(vec![10, 11, 12]);
1084 assert!(set.has_pending_changes().await);
1086
1087 assert_eq!(set.keys().await?, vec![vec![7, 8, 9], vec![10, 11, 12]]);
1089
1090 let mut batch = Batch::new();
1092 let delete_view = set.pre_save(&mut batch)?;
1093 assert!(!delete_view);
1096 assert!(!batch.is_empty());
1098
1099 set.context().store().write_batch(batch).await?;
1101 set.post_save();
1102 assert!(!set.has_pending_changes().await);
1104
1105 let new_set = ByteSetView::load(set.context().clone()).await?;
1107 assert!(new_set.contains(&[7, 8, 9]).await?);
1108 assert!(new_set.contains(&[10, 11, 12]).await?);
1109 assert!(!new_set.contains(&[1, 2, 3]).await?);
1110 assert!(!new_set.contains(&[4, 5, 6]).await?);
1111 assert!(!new_set.has_pending_changes().await);
1113
1114 Ok(())
1115 }
1116
1117 #[tokio::test]
1118 async fn test_byte_set_view_flush_with_delete_storage_first_no_set_updates(
1119 ) -> Result<(), ViewError> {
1120 let context = MemoryContext::new_for_testing(());
1121 let mut set = ByteSetView::load(context).await?;
1122
1123 set.insert(vec![1, 2, 3]);
1125 let mut batch = Batch::new();
1126 set.pre_save(&mut batch)?;
1127 set.context().store().write_batch(batch).await?;
1128 set.post_save();
1129
1130 set.clear();
1132 let mut batch = Batch::new();
1133 let delete_view = set.pre_save(&mut batch)?;
1134
1135 assert!(delete_view);
1137
1138 Ok(())
1139 }
1140
1141 #[tokio::test]
1142 async fn test_byte_set_view_flush_with_delete_storage_first_mixed_updates(
1143 ) -> Result<(), ViewError> {
1144 let context = MemoryContext::new_for_testing(());
1145 let mut set = ByteSetView::load(context).await?;
1146
1147 set.insert(vec![1, 2, 3]);
1149 set.insert(vec![4, 5, 6]);
1150 let mut batch = Batch::new();
1151 set.pre_save(&mut batch)?;
1152 set.context().store().write_batch(batch).await?;
1153 set.post_save();
1154
1155 set.clear();
1157
1158 set.insert(vec![7, 8, 9]); set.remove(vec![10, 11, 12]); let mut batch = Batch::new();
1163 let delete_view = set.pre_save(&mut batch)?;
1164
1165 assert!(!delete_view);
1167
1168 Ok(())
1169 }
1170
1171 #[tokio::test]
1172 async fn test_has_pending_changes_comprehensive() -> Result<(), ViewError> {
1173 let context = MemoryContext::new_for_testing(());
1174 let mut set = ByteSetView::load(context).await?;
1175
1176 assert!(!set.has_pending_changes().await);
1178
1179 set.insert(vec![1]);
1181 assert!(set.has_pending_changes().await);
1182
1183 set.insert(vec![2]);
1185 set.insert(vec![3]);
1186 assert!(set.has_pending_changes().await);
1187
1188 let mut batch = Batch::new();
1190 set.pre_save(&mut batch)?;
1191 set.context().store().write_batch(batch).await?;
1192 set.post_save();
1193 assert!(!set.has_pending_changes().await);
1194
1195 set.remove(vec![1]);
1197 assert!(set.has_pending_changes().await);
1198
1199 set.clear();
1201 assert!(set.has_pending_changes().await);
1202
1203 set.insert(vec![4]);
1205 assert!(set.has_pending_changes().await);
1206
1207 set.rollback();
1209 assert!(!set.has_pending_changes().await);
1210
1211 assert!(set.contains(&[2]).await?);
1213 assert!(set.contains(&[3]).await?);
1214
1215 Ok(())
1216 }
1217
1218 #[tokio::test]
1219 async fn test_for_each_key_while_match_update_pattern() -> Result<(), ViewError> {
1220 let context = MemoryContext::new_for_testing(());
1221 let mut set = ByteSetView::load(context).await?;
1222
1223 set.insert(vec![1]);
1225 set.insert(vec![3]);
1226 set.insert(vec![5]);
1227 let mut batch = Batch::new();
1228 set.pre_save(&mut batch)?;
1229 set.context().store().write_batch(batch).await?;
1230 set.post_save();
1231
1232 set.insert(vec![2]); set.insert(vec![4]); let mut keys_processed = Vec::new();
1237
1238 set.for_each_key_while(|key| {
1241 keys_processed.push(key.to_vec());
1242 Ok(true) })
1244 .await?;
1245
1246 assert_eq!(
1248 keys_processed,
1249 vec![vec![1], vec![2], vec![3], vec![4], vec![5]]
1250 );
1251
1252 Ok(())
1253 }
1254
1255 #[tokio::test]
1256 async fn test_for_each_key_while_early_return() -> Result<(), ViewError> {
1257 let context = MemoryContext::new_for_testing(());
1258 let mut set = ByteSetView::load(context).await?;
1259
1260 set.insert(vec![1]);
1262 set.insert(vec![2]);
1263 set.insert(vec![3]);
1264 let mut batch = Batch::new();
1265 set.pre_save(&mut batch)?;
1266 set.context().store().write_batch(batch).await?;
1267 set.post_save();
1268
1269 let mut count = 0;
1270
1271 set.for_each_key_while(|_key| {
1273 count += 1;
1274 if count >= 2 {
1275 Ok(false) } else {
1277 Ok(true)
1278 }
1279 })
1280 .await?;
1281
1282 assert_eq!(count, 2);
1284
1285 Ok(())
1286 }
1287
1288 #[tokio::test]
1289 async fn test_hash_mut_delegation() -> Result<(), ViewError> {
1290 let context = MemoryContext::new_for_testing(());
1291 let mut set = ByteSetView::load(context).await?;
1292
1293 set.insert(vec![1, 2, 3]);
1295 set.insert(vec![4, 5, 6]);
1296
1297 let hash1 = set.hash_mut().await?;
1299 let hash2 = set.hash().await?;
1300
1301 assert_eq!(hash1, hash2);
1303
1304 set.insert(vec![7, 8, 9]);
1306 let hash3 = set.hash_mut().await?;
1307 assert_ne!(hash1, hash3);
1308
1309 Ok(())
1310 }
1311
1312 #[tokio::test]
1313 async fn test_for_each_key_while_early_return_on_update_set() -> Result<(), ViewError> {
1314 let context = MemoryContext::new_for_testing(());
1315 let mut set = ByteSetView::load(context).await?;
1316
1317 set.insert(vec![1]);
1319 set.insert(vec![3]);
1320 let mut batch = Batch::new();
1321 set.pre_save(&mut batch)?;
1322 set.context().store().write_batch(batch).await?;
1323 set.post_save();
1324
1325 set.insert(vec![0]); set.insert(vec![2]); let mut count = 0;
1330
1331 set.for_each_key_while(|key| {
1334 count += 1;
1335 if key == [0] {
1336 Ok(false) } else {
1338 Ok(true)
1339 }
1340 })
1341 .await?;
1342
1343 assert_eq!(count, 1);
1345
1346 Ok(())
1347 }
1348
1349 #[tokio::test]
1350 async fn test_for_each_key_while_early_return_in_remaining_updates() -> Result<(), ViewError> {
1351 let context = MemoryContext::new_for_testing(());
1352 let mut set = ByteSetView::load(context).await?;
1353
1354 set.insert(vec![1]);
1357 set.insert(vec![2]);
1358 set.insert(vec![3]);
1359
1360 let mut count = 0;
1361
1362 set.for_each_key_while(|key| {
1364 count += 1;
1365 if key == [2] {
1366 Ok(false) } else {
1368 Ok(true)
1369 }
1370 })
1371 .await?;
1372
1373 assert_eq!(count, 2);
1375
1376 Ok(())
1377 }
1378
1379 #[tokio::test]
1380 async fn test_contains_update_removed_returns_false() -> Result<(), ViewError> {
1381 let context = MemoryContext::new_for_testing(());
1382 let mut set = ByteSetView::load(context).await?;
1383
1384 assert!(!set.has_pending_changes().await);
1386
1387 set.insert(vec![1, 2, 3]);
1389
1390 assert!(set.has_pending_changes().await);
1392
1393 assert_eq!(set.keys().await?, vec![vec![1, 2, 3]]);
1395
1396 let mut batch = Batch::new();
1397 set.pre_save(&mut batch)?;
1398 set.context().store().write_batch(batch).await?;
1399 set.post_save();
1400
1401 assert!(!set.has_pending_changes().await);
1403
1404 assert_eq!(set.keys().await?, vec![vec![1, 2, 3]]);
1406
1407 assert!(set.contains(&[1, 2, 3]).await?);
1409
1410 set.remove(vec![1, 2, 3]);
1412
1413 assert!(set.has_pending_changes().await);
1415
1416 assert!(set.keys().await?.is_empty());
1418
1419 assert!(!set.contains(&[1, 2, 3]).await?);
1422
1423 Ok(())
1424 }
1425
1426 #[tokio::test]
1427 async fn test_contains_delete_storage_first_returns_false() -> Result<(), ViewError> {
1428 let context = MemoryContext::new_for_testing(());
1429 let mut set = ByteSetView::load(context).await?;
1430
1431 set.insert(vec![1]);
1433 set.insert(vec![2]);
1434 set.insert(vec![3]);
1435 let mut batch = Batch::new();
1436 set.pre_save(&mut batch)?;
1437 set.context().store().write_batch(batch).await?;
1438 set.post_save();
1439
1440 assert!(set.contains(&[1]).await?);
1442 assert!(set.contains(&[2]).await?);
1443 assert!(set.contains(&[3]).await?);
1444
1445 set.clear();
1447
1448 assert!(!set.contains(&[1]).await?);
1451 assert!(!set.contains(&[2]).await?);
1452 assert!(!set.contains(&[3]).await?);
1453
1454 assert!(!set.contains(&[99]).await?);
1456
1457 Ok(())
1458 }
1459
1460 #[tokio::test]
1461 async fn test_contains_delete_storage_first_with_new_additions() -> Result<(), ViewError> {
1462 let context = MemoryContext::new_for_testing(());
1463 let mut set = ByteSetView::load(context).await?;
1464
1465 set.insert(vec![1]);
1467 set.insert(vec![2]);
1468 let mut batch = Batch::new();
1469 set.pre_save(&mut batch)?;
1470 set.context().store().write_batch(batch).await?;
1471 set.post_save();
1472
1473 set.clear();
1475
1476 set.insert(vec![3]);
1478 set.insert(vec![4]);
1479
1480 assert!(!set.contains(&[1]).await?); assert!(!set.contains(&[2]).await?); assert!(set.contains(&[3]).await?); assert!(set.contains(&[4]).await?); Ok(())
1488 }
1489
1490 #[tokio::test]
1491 async fn test_for_each_key_while_update_set_processing_in_stored_loop() -> Result<(), ViewError>
1492 {
1493 let context = MemoryContext::new_for_testing(());
1494 let mut set = ByteSetView::load(context).await?;
1495
1496 set.insert(vec![2]);
1498 set.insert(vec![4]);
1499 set.insert(vec![6]);
1500 let mut batch = Batch::new();
1501 set.pre_save(&mut batch)?;
1502 set.context().store().write_batch(batch).await?;
1503 set.post_save();
1504
1505 set.insert(vec![1]); set.insert(vec![3]); set.remove(vec![5]); let mut processed_keys = Vec::new();
1511
1512 set.for_each_key_while(|key| {
1515 processed_keys.push(key.to_vec());
1516 Ok(true)
1517 })
1518 .await?;
1519
1520 assert_eq!(
1523 processed_keys,
1524 vec![vec![1], vec![2], vec![3], vec![4], vec![6]]
1525 );
1526
1527 Ok(())
1528 }
1529
1530 #[tokio::test]
1531 async fn test_set_view_flush_with_delete_storage_first_and_set_updates() -> Result<(), ViewError>
1532 {
1533 let context = MemoryContext::new_for_testing(());
1534 let mut set = SetView::<_, u32>::load(context).await?;
1535
1536 set.insert(&42)?;
1538 set.insert(&84)?;
1539 let mut batch = Batch::new();
1540 set.pre_save(&mut batch)?;
1541 set.context().store().write_batch(batch).await?;
1542 set.post_save();
1543
1544 set.clear();
1546
1547 set.insert(&123)?;
1549 set.insert(&456)?;
1550
1551 let mut batch = Batch::new();
1552 let delete_view = set.pre_save(&mut batch)?;
1553
1554 assert!(!delete_view);
1556
1557 set.context().store().write_batch(batch).await?;
1559 set.post_save();
1560 let new_set = SetView::<_, u32>::load(set.context().clone()).await?;
1561 assert!(new_set.contains(&123).await?);
1562 assert!(new_set.contains(&456).await?);
1563 assert!(!new_set.contains(&42).await?);
1564 assert!(!new_set.contains(&84).await?);
1565
1566 Ok(())
1567 }
1568
1569 #[tokio::test]
1570 async fn test_set_view_count_delegation() -> Result<(), ViewError> {
1571 let context = MemoryContext::new_for_testing(());
1572 let mut set = SetView::<_, u32>::load(context).await?;
1573
1574 assert!(!set.has_pending_changes().await);
1576
1577 assert_eq!(set.iterative_count().await?, 0);
1579
1580 set.insert(&42)?;
1582
1583 assert!(set.has_pending_changes().await);
1585
1586 assert_eq!(set.indices().await?, vec![42]);
1588
1589 set.insert(&84)?;
1590 set.insert(&126)?;
1591
1592 assert!(set.has_pending_changes().await);
1594
1595 assert_eq!(set.indices().await?, vec![42, 84, 126]);
1597
1598 assert_eq!(set.iterative_count().await?, 3);
1600
1601 Ok(())
1602 }
1603
1604 #[tokio::test]
1605 async fn test_set_view_hash_mut_delegation() -> Result<(), ViewError> {
1606 let context = MemoryContext::new_for_testing(());
1607 let mut set = SetView::<_, u32>::load(context).await?;
1608
1609 set.insert(&42)?;
1611 set.insert(&84)?;
1612 set.insert(&126)?;
1613
1614 let hash1 = set.hash_mut().await?;
1616 let hash2 = set.hash().await?;
1617
1618 assert_eq!(hash1, hash2);
1620
1621 set.insert(&168)?;
1623 let hash3 = set.hash_mut().await?;
1624 assert_ne!(hash1, hash3);
1625
1626 let context2 = MemoryContext::new_for_testing(());
1628 let mut byte_set = ByteSetView::load(context2).await?;
1629
1630 use crate::context::BaseKey;
1632 byte_set.insert(BaseKey::derive_short_key(&42u32)?);
1633 byte_set.insert(BaseKey::derive_short_key(&84u32)?);
1634 byte_set.insert(BaseKey::derive_short_key(&126u32)?);
1635 byte_set.insert(BaseKey::derive_short_key(&168u32)?);
1636
1637 let byte_set_hash = byte_set.hash_mut().await?;
1638 assert_eq!(hash3, byte_set_hash);
1639
1640 Ok(())
1641 }
1642
1643 #[tokio::test]
1645 async fn test_custom_set_view_flush_with_delete_storage_first_and_set_updates(
1646 ) -> Result<(), ViewError> {
1647 let context = MemoryContext::new_for_testing(());
1648 let mut set = CustomSetView::<_, u128>::load(context).await?;
1649
1650 assert!(!set.has_pending_changes().await);
1652
1653 set.insert(&42u128)?;
1655 set.insert(&84u128)?;
1656
1657 assert!(set.has_pending_changes().await);
1659
1660 assert_eq!(set.indices().await?, vec![42u128, 84u128]);
1662
1663 let mut batch = Batch::new();
1664 set.pre_save(&mut batch)?;
1665 set.context().store().write_batch(batch).await?;
1666 set.post_save();
1667
1668 assert!(!set.has_pending_changes().await);
1670
1671 assert_eq!(set.indices().await?, vec![42u128, 84u128]);
1673
1674 set.clear();
1676
1677 assert!(set.has_pending_changes().await);
1679
1680 assert!(set.indices().await?.is_empty());
1682
1683 set.insert(&123u128)?;
1685 set.insert(&456u128)?;
1686
1687 assert!(set.has_pending_changes().await);
1689
1690 assert_eq!(set.indices().await?, vec![123u128, 456u128]);
1692
1693 let mut batch = Batch::new();
1694 let delete_view = set.pre_save(&mut batch)?;
1695
1696 assert!(!delete_view);
1698
1699 set.context().store().write_batch(batch).await?;
1701 set.post_save();
1702
1703 assert!(!set.has_pending_changes().await);
1705
1706 let new_set = CustomSetView::<_, u128>::load(set.context().clone()).await?;
1707 assert!(new_set.contains(&123u128).await?);
1708 assert!(new_set.contains(&456u128).await?);
1709 assert!(!new_set.contains(&42u128).await?);
1710 assert!(!new_set.contains(&84u128).await?);
1711
1712 assert!(!new_set.has_pending_changes().await);
1714
1715 Ok(())
1716 }
1717
1718 #[tokio::test]
1719 async fn test_custom_set_view_contains_update_removed_returns_false() -> Result<(), ViewError> {
1720 let context = MemoryContext::new_for_testing(());
1721 let mut set = CustomSetView::<_, u128>::load(context).await?;
1722
1723 set.insert(&12345u128)?;
1725 let mut batch = Batch::new();
1726 set.pre_save(&mut batch)?;
1727 set.context().store().write_batch(batch).await?;
1728 set.post_save();
1729
1730 assert!(set.contains(&12345u128).await?);
1732
1733 set.remove(&12345u128)?;
1735
1736 assert!(!set.contains(&12345u128).await?);
1738
1739 Ok(())
1740 }
1741
1742 #[tokio::test]
1743 async fn test_custom_set_view_contains_delete_storage_first_returns_false(
1744 ) -> Result<(), ViewError> {
1745 let context = MemoryContext::new_for_testing(());
1746 let mut set = CustomSetView::<_, u128>::load(context).await?;
1747
1748 set.insert(&111u128)?;
1750 set.insert(&222u128)?;
1751 set.insert(&333u128)?;
1752 let mut batch = Batch::new();
1753 set.pre_save(&mut batch)?;
1754 set.context().store().write_batch(batch).await?;
1755 set.post_save();
1756
1757 assert!(set.contains(&111u128).await?);
1759 assert!(set.contains(&222u128).await?);
1760
1761 set.clear();
1763
1764 assert!(!set.contains(&111u128).await?);
1766 assert!(!set.contains(&222u128).await?);
1767 assert!(!set.contains(&333u128).await?);
1768
1769 Ok(())
1770 }
1771
1772 #[tokio::test]
1773 async fn test_custom_set_view_hash_mut_delegation() -> Result<(), ViewError> {
1774 let context = MemoryContext::new_for_testing(());
1775 let mut set = CustomSetView::<_, u128>::load(context).await?;
1776
1777 set.insert(&1000u128)?;
1779 set.insert(&2000u128)?;
1780
1781 let hash1 = set.hash_mut().await?;
1783 let hash2 = set.hash().await?;
1784
1785 assert_eq!(hash1, hash2);
1787
1788 set.insert(&3000u128)?;
1790 let hash3 = set.hash_mut().await?;
1791 assert_ne!(hash1, hash3);
1792
1793 Ok(())
1794 }
1795
1796 #[tokio::test]
1797 async fn test_custom_set_view_for_each_index_while_method_signature() -> Result<(), ViewError> {
1798 let context = MemoryContext::new_for_testing(());
1799 let mut set = CustomSetView::<_, u128>::load(context).await?;
1800 assert_eq!(set.iterative_count().await?, 0);
1801
1802 set.insert(&100u128)?;
1804 set.insert(&200u128)?;
1805 set.insert(&300u128)?;
1806
1807 assert_eq!(set.iterative_count().await?, 3);
1808
1809 let mut collected_indices = Vec::new();
1810
1811 set.for_each_index_while(|index| {
1814 collected_indices.push(index);
1815 Ok(true)
1816 })
1817 .await?;
1818
1819 assert_eq!(collected_indices, vec![100u128, 200u128, 300u128]);
1821
1822 Ok(())
1823 }
1824
1825 #[tokio::test]
1826 async fn test_custom_set_view_rollback() -> Result<(), ViewError> {
1827 let context = MemoryContext::new_for_testing(());
1828 let mut set = CustomSetView::<_, u128>::load(context).await?;
1829
1830 set.insert(&100u128)?;
1832 set.insert(&200u128)?;
1833 let mut batch = Batch::new();
1834 set.pre_save(&mut batch)?;
1835 set.context().store().write_batch(batch).await?;
1836 set.post_save();
1837
1838 assert!(set.contains(&100u128).await?);
1840 assert!(set.contains(&200u128).await?);
1841 assert!(!set.has_pending_changes().await);
1842
1843 set.insert(&300u128)?;
1845 set.remove(&100u128)?;
1846 assert!(set.has_pending_changes().await);
1847
1848 assert!(set.contains(&300u128).await?);
1850 assert!(!set.contains(&100u128).await?);
1851
1852 set.rollback();
1854
1855 assert!(!set.has_pending_changes().await);
1857 assert!(set.contains(&100u128).await?);
1858 assert!(set.contains(&200u128).await?);
1859 assert!(!set.contains(&300u128).await?);
1860
1861 Ok(())
1862 }
1863
1864 #[tokio::test]
1865 async fn test_custom_set_view_clone_unchecked() -> Result<(), ViewError> {
1866 let context = MemoryContext::new_for_testing(());
1867 let mut set = CustomSetView::<_, u128>::load(context).await?;
1868
1869 set.insert(&42u128)?;
1871 set.insert(&84u128)?;
1872
1873 let mut cloned_set = set.clone_unchecked()?;
1875
1876 assert!(cloned_set.contains(&42u128).await?);
1878 assert!(cloned_set.contains(&84u128).await?);
1879
1880 cloned_set.insert(&126u128)?;
1882 assert!(cloned_set.contains(&126u128).await?);
1883 assert!(!set.contains(&126u128).await?);
1884
1885 set.insert(&168u128)?;
1887 assert!(set.contains(&168u128).await?);
1888 assert!(!cloned_set.contains(&168u128).await?);
1889
1890 Ok(())
1891 }
1892
1893 #[cfg(with_graphql)]
1894 mod graphql_tests {
1895 use async_graphql::{EmptyMutation, EmptySubscription, Object, Schema};
1896
1897 use super::*;
1898
1899 struct Query;
1901
1902 #[Object]
1903 impl Query {
1904 async fn test_set(&self) -> TestSetView {
1905 let context = MemoryContext::new_for_testing(());
1906 let mut set = SetView::<_, u32>::load(context).await.unwrap();
1907
1908 set.insert(&42).unwrap();
1910 set.insert(&84).unwrap();
1911 set.insert(&126).unwrap();
1912 set.insert(&168).unwrap();
1913 set.insert(&210).unwrap();
1914
1915 TestSetView { set }
1916 }
1917 }
1918
1919 struct TestSetView {
1920 set: SetView<MemoryContext<()>, u32>,
1921 }
1922
1923 #[Object]
1924 impl TestSetView {
1925 async fn elements(
1926 &self,
1927 count: Option<usize>,
1928 ) -> Result<Vec<u32>, async_graphql::Error> {
1929 let mut indices = self.set.indices().await?;
1931 if let Some(count) = count {
1932 indices.truncate(count);
1934 }
1935 Ok(indices)
1936 }
1937
1938 async fn count(&self) -> Result<u32, async_graphql::Error> {
1939 let count = self.set.iterative_count().await?;
1940 u32::try_from(count).map_err(|_| async_graphql::Error::new("count exceeds u32"))
1941 }
1942 }
1943
1944 #[tokio::test]
1945 async fn test_graphql_elements_without_count() -> Result<(), Box<dyn std::error::Error>> {
1946 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
1947
1948 let query = r#"
1950 query {
1951 testSet {
1952 elements
1953 }
1954 }
1955 "#;
1956
1957 let result = schema.execute(query).await;
1958 assert!(result.errors.is_empty());
1959
1960 let data = result.data.into_json()?;
1961 let elements = &data["testSet"]["elements"];
1962 assert!(elements.is_array());
1963 assert_eq!(elements.as_array().unwrap().len(), 5);
1964
1965 Ok(())
1966 }
1967
1968 #[tokio::test]
1969 async fn test_graphql_elements_with_count() -> Result<(), Box<dyn std::error::Error>> {
1970 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
1971
1972 let query = r#"
1974 query {
1975 testSet {
1976 elements(count: 3)
1977 }
1978 }
1979 "#;
1980
1981 let result = schema.execute(query).await;
1982 assert!(result.errors.is_empty());
1983
1984 let data = result.data.into_json()?;
1985 let elements = &data["testSet"]["elements"];
1986 assert!(elements.is_array());
1987 assert_eq!(elements.as_array().unwrap().len(), 3);
1989
1990 Ok(())
1991 }
1992
1993 #[tokio::test]
1994 async fn test_graphql_count_field() -> Result<(), Box<dyn std::error::Error>> {
1995 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
1996
1997 let query = r#"
1998 query {
1999 testSet {
2000 count
2001 }
2002 }
2003 "#;
2004
2005 let result = schema.execute(query).await;
2006 assert!(result.errors.is_empty());
2007
2008 let data = result.data.into_json()?;
2009 let count = &data["testSet"]["count"];
2010 assert_eq!(count.as_u64().unwrap(), 5);
2011
2012 Ok(())
2013 }
2014 }
2015}