1use std::{borrow::Borrow, collections::BTreeMap, marker::PhantomData};
5
6use allocative::Allocative;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use serde::{de::DeserializeOwned, Serialize};
10
11use crate::{
12 batch::Batch,
13 common::{CustomSerialize, HasherOutput, Update},
14 context::{BaseKey, Context},
15 hashable_wrapper::WrappedHashableContainerView,
16 historical_hash_wrapper::HistoricallyHashableView,
17 store::ReadableKeyValueStore as _,
18 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError},
19};
20
21#[cfg(with_metrics)]
22mod metrics {
23 use std::sync::LazyLock;
24
25 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
26 use prometheus::HistogramVec;
27
28 pub static SET_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
30 register_histogram_vec(
31 "set_view_hash_runtime",
32 "SetView hash runtime",
33 &[],
34 exponential_bucket_latencies(5.0),
35 )
36 });
37}
38
39#[derive(Debug, Allocative)]
41#[allocative(bound = "C")]
42pub struct ByteSetView<C> {
43 #[allocative(skip)]
45 context: C,
46 delete_storage_first: bool,
48 updates: BTreeMap<Vec<u8>, Update<()>>,
50}
51
52impl<C: Context, C2: Context> ReplaceContext<C2> for ByteSetView<C> {
53 type Target = ByteSetView<C2>;
54
55 async fn with_context(
56 &mut self,
57 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
58 ) -> Self::Target {
59 ByteSetView {
60 context: ctx(&self.context),
61 delete_storage_first: self.delete_storage_first,
62 updates: self.updates.clone(),
63 }
64 }
65}
66
67impl<C: Context> View for ByteSetView<C> {
68 const NUM_INIT_KEYS: usize = 0;
69
70 type Context = C;
71
72 fn context(&self) -> C {
73 self.context.clone()
74 }
75
76 fn pre_load(_context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
77 Ok(Vec::new())
78 }
79
80 fn post_load(context: C, _values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
81 Ok(Self {
82 context,
83 delete_storage_first: false,
84 updates: BTreeMap::new(),
85 })
86 }
87
88 fn rollback(&mut self) {
89 self.delete_storage_first = false;
90 self.updates.clear();
91 }
92
93 async fn has_pending_changes(&self) -> bool {
94 if self.delete_storage_first {
95 return true;
96 }
97 !self.updates.is_empty()
98 }
99
100 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
101 let mut delete_view = false;
102 if self.delete_storage_first {
103 delete_view = true;
104 batch.delete_key_prefix(self.context.base_key().bytes.clone());
105 for (index, update) in self.updates.iter() {
106 if let Update::Set(_) = update {
107 let key = self.context.base_key().base_index(index);
108 batch.put_key_value_bytes(key, Vec::new());
109 delete_view = false;
110 }
111 }
112 } else {
113 for (index, update) in self.updates.iter() {
114 let key = self.context.base_key().base_index(index);
115 match update {
116 Update::Removed => batch.delete_key(key),
117 Update::Set(_) => batch.put_key_value_bytes(key, Vec::new()),
118 }
119 }
120 }
121 Ok(delete_view)
122 }
123
124 fn post_save(&mut self) {
125 self.delete_storage_first = false;
126 self.updates.clear();
127 }
128
129 fn clear(&mut self) {
130 self.delete_storage_first = true;
131 self.updates.clear();
132 }
133}
134
135impl<C: Context> ClonableView for ByteSetView<C> {
136 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
137 Ok(ByteSetView {
138 context: self.context.clone(),
139 delete_storage_first: self.delete_storage_first,
140 updates: self.updates.clone(),
141 })
142 }
143}
144
145impl<C: Context> ByteSetView<C> {
146 pub fn insert(&mut self, short_key: Vec<u8>) {
158 self.updates.insert(short_key, Update::Set(()));
159 }
160
161 pub fn remove(&mut self, short_key: Vec<u8>) {
173 if self.delete_storage_first {
174 self.updates.remove(&short_key);
176 } else {
177 self.updates.insert(short_key, Update::Removed);
178 }
179 }
180
181 pub fn extra(&self) -> &C::Extra {
183 self.context.extra()
184 }
185}
186
187impl<C: Context> ByteSetView<C> {
188 pub async fn contains(&self, short_key: &[u8]) -> Result<bool, ViewError> {
201 if let Some(update) = self.updates.get(short_key) {
202 let value = match update {
203 Update::Removed => false,
204 Update::Set(()) => true,
205 };
206 return Ok(value);
207 }
208 if self.delete_storage_first {
209 return Ok(false);
210 }
211 let key = self.context.base_key().base_index(short_key);
212 Ok(self.context.store().contains_key(&key).await?)
213 }
214}
215
216impl<C: Context> ByteSetView<C> {
217 pub async fn keys(&self) -> Result<Vec<Vec<u8>>, ViewError> {
230 let mut keys = Vec::new();
231 self.for_each_key(|key| {
232 keys.push(key.to_vec());
233 Ok(())
234 })
235 .await?;
236 Ok(keys)
237 }
238
239 pub async fn count(&self) -> Result<usize, ViewError> {
252 let mut count = 0;
253 self.for_each_key(|_key| {
254 count += 1;
255 Ok(())
256 })
257 .await?;
258 Ok(count)
259 }
260
261 pub async fn for_each_key_while<F>(&self, mut f: F) -> Result<(), ViewError>
284 where
285 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
286 {
287 let mut updates = self.updates.iter();
288 let mut update = updates.next();
289 if !self.delete_storage_first {
290 let base = &self.context.base_key().bytes;
291 for index in self.context.store().find_keys_by_prefix(base).await? {
292 loop {
293 match update {
294 Some((key, value)) if key <= &index => {
295 if let Update::Set(_) = value {
296 if !f(key)? {
297 return Ok(());
298 }
299 }
300 update = updates.next();
301 if key == &index {
302 break;
303 }
304 }
305 _ => {
306 if !f(&index)? {
307 return Ok(());
308 }
309 break;
310 }
311 }
312 }
313 }
314 }
315 while let Some((key, value)) = update {
316 if let Update::Set(_) = value {
317 if !f(key)? {
318 return Ok(());
319 }
320 }
321 update = updates.next();
322 }
323 Ok(())
324 }
325
326 pub async fn for_each_key<F>(&self, mut f: F) -> Result<(), ViewError>
348 where
349 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
350 {
351 self.for_each_key_while(|key| {
352 f(key)?;
353 Ok(true)
354 })
355 .await
356 }
357}
358
359impl<C: Context> HashableView for ByteSetView<C> {
360 type Hasher = sha3::Sha3_256;
361
362 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
363 self.hash().await
364 }
365
366 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
367 #[cfg(with_metrics)]
368 let _hash_latency = metrics::SET_VIEW_HASH_RUNTIME.measure_latency();
369 let mut hasher = sha3::Sha3_256::default();
370 let mut count = 0u32;
371 self.for_each_key(|key| {
372 count += 1;
373 hasher.update_with_bytes(key)?;
374 Ok(())
375 })
376 .await?;
377 hasher.update_with_bcs_bytes(&count)?;
378 Ok(hasher.finalize())
379 }
380}
381
382#[derive(Debug, Allocative)]
384#[allocative(bound = "C, I")]
385pub struct SetView<C, I> {
386 set: ByteSetView<C>,
388 #[allocative(skip)]
390 _phantom: PhantomData<I>,
391}
392
393impl<C: Context, I: Send + Sync + Serialize, C2: Context> ReplaceContext<C2> for SetView<C, I> {
394 type Target = SetView<C2, I>;
395
396 async fn with_context(
397 &mut self,
398 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
399 ) -> Self::Target {
400 SetView {
401 set: self.set.with_context(ctx).await,
402 _phantom: self._phantom,
403 }
404 }
405}
406
407impl<C: Context, I: Send + Sync + Serialize> View for SetView<C, I> {
408 const NUM_INIT_KEYS: usize = ByteSetView::<C>::NUM_INIT_KEYS;
409
410 type Context = C;
411
412 fn context(&self) -> C {
413 self.set.context()
414 }
415
416 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
417 ByteSetView::<C>::pre_load(context)
418 }
419
420 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
421 let set = ByteSetView::post_load(context, values)?;
422 Ok(Self {
423 set,
424 _phantom: PhantomData,
425 })
426 }
427
428 fn rollback(&mut self) {
429 self.set.rollback()
430 }
431
432 async fn has_pending_changes(&self) -> bool {
433 self.set.has_pending_changes().await
434 }
435
436 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
437 self.set.pre_save(batch)
438 }
439
440 fn post_save(&mut self) {
441 self.set.post_save()
442 }
443
444 fn clear(&mut self) {
445 self.set.clear()
446 }
447}
448
449impl<C, I> ClonableView for SetView<C, I>
450where
451 C: Context,
452 I: Send + Sync + Serialize,
453{
454 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
455 Ok(SetView {
456 set: self.set.clone_unchecked()?,
457 _phantom: PhantomData,
458 })
459 }
460}
461
462impl<C: Context, I: Serialize> SetView<C, I> {
463 pub fn insert<Q>(&mut self, index: &Q) -> Result<(), ViewError>
476 where
477 I: Borrow<Q>,
478 Q: Serialize + ?Sized,
479 {
480 let short_key = BaseKey::derive_short_key(index)?;
481 self.set.insert(short_key);
482 Ok(())
483 }
484
485 pub fn remove<Q>(&mut self, index: &Q) -> Result<(), ViewError>
497 where
498 I: Borrow<Q>,
499 Q: Serialize + ?Sized,
500 {
501 let short_key = BaseKey::derive_short_key(index)?;
502 self.set.remove(short_key);
503 Ok(())
504 }
505
506 pub fn extra(&self) -> &C::Extra {
508 self.set.extra()
509 }
510}
511
512impl<C: Context, I: Serialize> SetView<C, I> {
513 pub async fn contains<Q>(&self, index: &Q) -> Result<bool, ViewError>
526 where
527 I: Borrow<Q>,
528 Q: Serialize + ?Sized,
529 {
530 let short_key = BaseKey::derive_short_key(index)?;
531 self.set.contains(&short_key).await
532 }
533}
534
535impl<C: Context, I: Serialize + DeserializeOwned + Send> SetView<C, I> {
536 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
548 let mut indices = Vec::new();
549 self.for_each_index(|index| {
550 indices.push(index);
551 Ok(())
552 })
553 .await?;
554 Ok(indices)
555 }
556
557 pub async fn count(&self) -> Result<usize, ViewError> {
569 self.set.count().await
570 }
571
572 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
596 where
597 F: FnMut(I) -> Result<bool, ViewError> + Send,
598 {
599 self.set
600 .for_each_key_while(|key| {
601 let index = BaseKey::deserialize_value(key)?;
602 f(index)
603 })
604 .await?;
605 Ok(())
606 }
607
608 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
631 where
632 F: FnMut(I) -> Result<(), ViewError> + Send,
633 {
634 self.set
635 .for_each_key(|key| {
636 let index = BaseKey::deserialize_value(key)?;
637 f(index)
638 })
639 .await?;
640 Ok(())
641 }
642}
643
644impl<C, I> HashableView for SetView<C, I>
645where
646 Self: View,
647 ByteSetView<C>: HashableView,
648{
649 type Hasher = <ByteSetView<C> as HashableView>::Hasher;
650
651 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
652 self.set.hash_mut().await
653 }
654
655 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
656 self.set.hash().await
657 }
658}
659
660#[derive(Debug, Allocative)]
663#[allocative(bound = "C, I")]
664pub struct CustomSetView<C, I> {
665 set: ByteSetView<C>,
667 #[allocative(skip)]
669 _phantom: PhantomData<I>,
670}
671
672impl<C, I> View for CustomSetView<C, I>
673where
674 C: Context,
675 I: Send + Sync + CustomSerialize,
676{
677 const NUM_INIT_KEYS: usize = ByteSetView::<C>::NUM_INIT_KEYS;
678
679 type Context = C;
680
681 fn context(&self) -> C {
682 self.set.context()
683 }
684
685 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
686 ByteSetView::pre_load(context)
687 }
688
689 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
690 let set = ByteSetView::post_load(context, values)?;
691 Ok(Self {
692 set,
693 _phantom: PhantomData,
694 })
695 }
696
697 fn rollback(&mut self) {
698 self.set.rollback()
699 }
700
701 async fn has_pending_changes(&self) -> bool {
702 self.set.has_pending_changes().await
703 }
704
705 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
706 self.set.pre_save(batch)
707 }
708
709 fn post_save(&mut self) {
710 self.set.post_save()
711 }
712
713 fn clear(&mut self) {
714 self.set.clear()
715 }
716}
717
718impl<C, I> ClonableView for CustomSetView<C, I>
719where
720 C: Context,
721 I: Send + Sync + CustomSerialize,
722{
723 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
724 Ok(CustomSetView {
725 set: self.set.clone_unchecked()?,
726 _phantom: PhantomData,
727 })
728 }
729}
730
731impl<C: Context, I: CustomSerialize> CustomSetView<C, I> {
732 pub fn insert<Q>(&mut self, index: &Q) -> Result<(), ViewError>
745 where
746 I: Borrow<Q>,
747 Q: CustomSerialize,
748 {
749 let short_key = index.to_custom_bytes()?;
750 self.set.insert(short_key);
751 Ok(())
752 }
753
754 pub fn remove<Q>(&mut self, index: &Q) -> Result<(), ViewError>
767 where
768 I: Borrow<Q>,
769 Q: CustomSerialize,
770 {
771 let short_key = index.to_custom_bytes()?;
772 self.set.remove(short_key);
773 Ok(())
774 }
775
776 pub fn extra(&self) -> &C::Extra {
778 self.set.extra()
779 }
780}
781
782impl<C, I> CustomSetView<C, I>
783where
784 C: Context,
785 I: CustomSerialize,
786{
787 pub async fn contains<Q>(&self, index: &Q) -> Result<bool, ViewError>
801 where
802 I: Borrow<Q>,
803 Q: CustomSerialize,
804 {
805 let short_key = index.to_custom_bytes()?;
806 self.set.contains(&short_key).await
807 }
808}
809
810impl<C, I> CustomSetView<C, I>
811where
812 C: Context,
813 I: Sync + Send + CustomSerialize,
814{
815 pub async fn indices(&self) -> Result<Vec<I>, ViewError> {
830 let mut indices = Vec::new();
831 self.for_each_index(|index| {
832 indices.push(index);
833 Ok(())
834 })
835 .await?;
836 Ok(indices)
837 }
838
839 pub async fn count(&self) -> Result<usize, ViewError> {
853 self.set.count().await
854 }
855
856 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
880 where
881 F: FnMut(I) -> Result<bool, ViewError> + Send,
882 {
883 self.set
884 .for_each_key_while(|key| {
885 let index = I::from_custom_bytes(key)?;
886 f(index)
887 })
888 .await?;
889 Ok(())
890 }
891
892 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
915 where
916 F: FnMut(I) -> Result<(), ViewError> + Send,
917 {
918 self.set
919 .for_each_key(|key| {
920 let index = I::from_custom_bytes(key)?;
921 f(index)
922 })
923 .await?;
924 Ok(())
925 }
926}
927
928impl<C: Context, I> HashableView for CustomSetView<C, I>
929where
930 Self: View,
931{
932 type Hasher = sha3::Sha3_256;
933
934 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
935 self.set.hash_mut().await
936 }
937
938 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
939 self.set.hash().await
940 }
941}
942
943pub type HashedByteSetView<C> = WrappedHashableContainerView<C, ByteSetView<C>, HasherOutput>;
945
946pub type HistoricallyHashedByteSetView<C> = HistoricallyHashableView<C, ByteSetView<C>>;
948
949pub type HashedSetView<C, I> = WrappedHashableContainerView<C, SetView<C, I>, HasherOutput>;
951
952pub type HistoricallyHashedSetView<C, I> = HistoricallyHashableView<C, SetView<C, I>>;
954
955pub type HashedCustomSetView<C, I> =
957 WrappedHashableContainerView<C, CustomSetView<C, I>, HasherOutput>;
958
959pub type HistoricallyHashedCustomSetView<C, I> = HistoricallyHashableView<C, CustomSetView<C, I>>;
961
962#[cfg(with_graphql)]
963mod graphql {
964 use std::borrow::Cow;
965
966 use serde::{de::DeserializeOwned, Serialize};
967
968 use super::{CustomSetView, SetView};
969 use crate::{
970 common::CustomSerialize,
971 context::Context,
972 graphql::{hash_name, mangle},
973 };
974
975 impl<C: Send + Sync, I: async_graphql::OutputType> async_graphql::TypeName for SetView<C, I> {
976 fn type_name() -> Cow<'static, str> {
977 format!(
978 "SetView_{}_{:08x}",
979 mangle(I::type_name()),
980 hash_name::<I>(),
981 )
982 .into()
983 }
984 }
985
986 #[async_graphql::Object(cache_control(no_cache), name_type)]
987 impl<C, I> SetView<C, I>
988 where
989 C: Context,
990 I: Send + Sync + Serialize + DeserializeOwned + async_graphql::OutputType,
991 {
992 async fn elements(&self, count: Option<usize>) -> Result<Vec<I>, async_graphql::Error> {
993 let mut indices = self.indices().await?;
994 if let Some(count) = count {
995 indices.truncate(count);
996 }
997 Ok(indices)
998 }
999
1000 #[graphql(derived(name = "count"))]
1001 async fn count_(&self) -> Result<u32, async_graphql::Error> {
1002 Ok(self.count().await? as u32)
1003 }
1004 }
1005
1006 impl<C: Send + Sync, I: async_graphql::OutputType> async_graphql::TypeName for CustomSetView<C, I> {
1007 fn type_name() -> Cow<'static, str> {
1008 format!(
1009 "CustomSetView_{}_{:08x}",
1010 mangle(I::type_name()),
1011 hash_name::<I>(),
1012 )
1013 .into()
1014 }
1015 }
1016
1017 #[async_graphql::Object(cache_control(no_cache), name_type)]
1018 impl<C, I> CustomSetView<C, I>
1019 where
1020 C: Context,
1021 I: Send + Sync + CustomSerialize + async_graphql::OutputType,
1022 {
1023 async fn elements(&self, count: Option<usize>) -> Result<Vec<I>, async_graphql::Error> {
1024 let mut indices = self.indices().await?;
1025 if let Some(count) = count {
1026 indices.truncate(count);
1027 }
1028 Ok(indices)
1029 }
1030
1031 #[graphql(derived(name = "count"))]
1032 async fn count_(&self) -> Result<u32, async_graphql::Error> {
1033 Ok(self.count().await? as u32)
1034 }
1035 }
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040 use super::*;
1041 use crate::{context::MemoryContext, store::WritableKeyValueStore as _};
1042
1043 #[tokio::test]
1044 async fn test_byte_set_view_flush_with_delete_storage_first_and_set_updates(
1045 ) -> Result<(), ViewError> {
1046 let context = MemoryContext::new_for_testing(());
1047 let mut set = ByteSetView::load(context).await?;
1048 assert!(!set.has_pending_changes().await);
1050
1051 set.insert(vec![1, 2, 3]);
1053 set.insert(vec![4, 5, 6]);
1054 assert!(set.has_pending_changes().await);
1056
1057 assert_eq!(set.keys().await?, vec![vec![1, 2, 3], vec![4, 5, 6]]);
1059
1060 let mut batch = Batch::new();
1061 set.pre_save(&mut batch)?;
1062 set.context().store().write_batch(batch).await?;
1063 set.post_save();
1064 assert!(!set.has_pending_changes().await);
1066
1067 assert_eq!(set.keys().await?, vec![vec![1, 2, 3], vec![4, 5, 6]]);
1069 assert_eq!(set.count().await?, 2);
1070
1071 set.clear();
1073 assert!(set.has_pending_changes().await);
1075
1076 assert!(set.keys().await?.is_empty());
1078
1079 set.insert(vec![7, 8, 9]);
1081 set.insert(vec![10, 11, 12]);
1082 assert!(set.has_pending_changes().await);
1084
1085 assert_eq!(set.keys().await?, vec![vec![7, 8, 9], vec![10, 11, 12]]);
1087
1088 let mut batch = Batch::new();
1090 let delete_view = set.pre_save(&mut batch)?;
1091 assert!(!delete_view);
1094 assert!(!batch.is_empty());
1096
1097 set.context().store().write_batch(batch).await?;
1099 set.post_save();
1100 assert!(!set.has_pending_changes().await);
1102
1103 let new_set = ByteSetView::load(set.context().clone()).await?;
1105 assert!(new_set.contains(&[7, 8, 9]).await?);
1106 assert!(new_set.contains(&[10, 11, 12]).await?);
1107 assert!(!new_set.contains(&[1, 2, 3]).await?);
1108 assert!(!new_set.contains(&[4, 5, 6]).await?);
1109 assert!(!new_set.has_pending_changes().await);
1111
1112 Ok(())
1113 }
1114
1115 #[tokio::test]
1116 async fn test_byte_set_view_flush_with_delete_storage_first_no_set_updates(
1117 ) -> Result<(), ViewError> {
1118 let context = MemoryContext::new_for_testing(());
1119 let mut set = ByteSetView::load(context).await?;
1120
1121 set.insert(vec![1, 2, 3]);
1123 let mut batch = Batch::new();
1124 set.pre_save(&mut batch)?;
1125 set.context().store().write_batch(batch).await?;
1126 set.post_save();
1127
1128 set.clear();
1130 let mut batch = Batch::new();
1131 let delete_view = set.pre_save(&mut batch)?;
1132
1133 assert!(delete_view);
1135
1136 Ok(())
1137 }
1138
1139 #[tokio::test]
1140 async fn test_byte_set_view_flush_with_delete_storage_first_mixed_updates(
1141 ) -> Result<(), ViewError> {
1142 let context = MemoryContext::new_for_testing(());
1143 let mut set = ByteSetView::load(context).await?;
1144
1145 set.insert(vec![1, 2, 3]);
1147 set.insert(vec![4, 5, 6]);
1148 let mut batch = Batch::new();
1149 set.pre_save(&mut batch)?;
1150 set.context().store().write_batch(batch).await?;
1151 set.post_save();
1152
1153 set.clear();
1155
1156 set.insert(vec![7, 8, 9]); set.remove(vec![10, 11, 12]); let mut batch = Batch::new();
1161 let delete_view = set.pre_save(&mut batch)?;
1162
1163 assert!(!delete_view);
1165
1166 Ok(())
1167 }
1168
1169 #[tokio::test]
1170 async fn test_has_pending_changes_comprehensive() -> Result<(), ViewError> {
1171 let context = MemoryContext::new_for_testing(());
1172 let mut set = ByteSetView::load(context).await?;
1173
1174 assert!(!set.has_pending_changes().await);
1176
1177 set.insert(vec![1]);
1179 assert!(set.has_pending_changes().await);
1180
1181 set.insert(vec![2]);
1183 set.insert(vec![3]);
1184 assert!(set.has_pending_changes().await);
1185
1186 let mut batch = Batch::new();
1188 set.pre_save(&mut batch)?;
1189 set.context().store().write_batch(batch).await?;
1190 set.post_save();
1191 assert!(!set.has_pending_changes().await);
1192
1193 set.remove(vec![1]);
1195 assert!(set.has_pending_changes().await);
1196
1197 set.clear();
1199 assert!(set.has_pending_changes().await);
1200
1201 set.insert(vec![4]);
1203 assert!(set.has_pending_changes().await);
1204
1205 set.rollback();
1207 assert!(!set.has_pending_changes().await);
1208
1209 assert!(set.contains(&[2]).await?);
1211 assert!(set.contains(&[3]).await?);
1212
1213 Ok(())
1214 }
1215
1216 #[tokio::test]
1217 async fn test_for_each_key_while_match_update_pattern() -> Result<(), ViewError> {
1218 let context = MemoryContext::new_for_testing(());
1219 let mut set = ByteSetView::load(context).await?;
1220
1221 set.insert(vec![1]);
1223 set.insert(vec![3]);
1224 set.insert(vec![5]);
1225 let mut batch = Batch::new();
1226 set.pre_save(&mut batch)?;
1227 set.context().store().write_batch(batch).await?;
1228 set.post_save();
1229
1230 set.insert(vec![2]); set.insert(vec![4]); let mut keys_processed = Vec::new();
1235
1236 set.for_each_key_while(|key| {
1239 keys_processed.push(key.to_vec());
1240 Ok(true) })
1242 .await?;
1243
1244 assert_eq!(
1246 keys_processed,
1247 vec![vec![1], vec![2], vec![3], vec![4], vec![5]]
1248 );
1249
1250 Ok(())
1251 }
1252
1253 #[tokio::test]
1254 async fn test_for_each_key_while_early_return() -> Result<(), ViewError> {
1255 let context = MemoryContext::new_for_testing(());
1256 let mut set = ByteSetView::load(context).await?;
1257
1258 set.insert(vec![1]);
1260 set.insert(vec![2]);
1261 set.insert(vec![3]);
1262 let mut batch = Batch::new();
1263 set.pre_save(&mut batch)?;
1264 set.context().store().write_batch(batch).await?;
1265 set.post_save();
1266
1267 let mut count = 0;
1268
1269 set.for_each_key_while(|_key| {
1271 count += 1;
1272 if count >= 2 {
1273 Ok(false) } else {
1275 Ok(true)
1276 }
1277 })
1278 .await?;
1279
1280 assert_eq!(count, 2);
1282
1283 Ok(())
1284 }
1285
1286 #[tokio::test]
1287 async fn test_hash_mut_delegation() -> Result<(), ViewError> {
1288 let context = MemoryContext::new_for_testing(());
1289 let mut set = ByteSetView::load(context).await?;
1290
1291 set.insert(vec![1, 2, 3]);
1293 set.insert(vec![4, 5, 6]);
1294
1295 let hash1 = set.hash_mut().await?;
1297 let hash2 = set.hash().await?;
1298
1299 assert_eq!(hash1, hash2);
1301
1302 set.insert(vec![7, 8, 9]);
1304 let hash3 = set.hash_mut().await?;
1305 assert_ne!(hash1, hash3);
1306
1307 Ok(())
1308 }
1309
1310 #[tokio::test]
1311 async fn test_for_each_key_while_early_return_on_update_set() -> Result<(), ViewError> {
1312 let context = MemoryContext::new_for_testing(());
1313 let mut set = ByteSetView::load(context).await?;
1314
1315 set.insert(vec![1]);
1317 set.insert(vec![3]);
1318 let mut batch = Batch::new();
1319 set.pre_save(&mut batch)?;
1320 set.context().store().write_batch(batch).await?;
1321 set.post_save();
1322
1323 set.insert(vec![0]); set.insert(vec![2]); let mut count = 0;
1328
1329 set.for_each_key_while(|key| {
1332 count += 1;
1333 if key == [0] {
1334 Ok(false) } else {
1336 Ok(true)
1337 }
1338 })
1339 .await?;
1340
1341 assert_eq!(count, 1);
1343
1344 Ok(())
1345 }
1346
1347 #[tokio::test]
1348 async fn test_for_each_key_while_early_return_in_remaining_updates() -> Result<(), ViewError> {
1349 let context = MemoryContext::new_for_testing(());
1350 let mut set = ByteSetView::load(context).await?;
1351
1352 set.insert(vec![1]);
1355 set.insert(vec![2]);
1356 set.insert(vec![3]);
1357
1358 let mut count = 0;
1359
1360 set.for_each_key_while(|key| {
1362 count += 1;
1363 if key == [2] {
1364 Ok(false) } else {
1366 Ok(true)
1367 }
1368 })
1369 .await?;
1370
1371 assert_eq!(count, 2);
1373
1374 Ok(())
1375 }
1376
1377 #[tokio::test]
1378 async fn test_contains_update_removed_returns_false() -> Result<(), ViewError> {
1379 let context = MemoryContext::new_for_testing(());
1380 let mut set = ByteSetView::load(context).await?;
1381
1382 assert!(!set.has_pending_changes().await);
1384
1385 set.insert(vec![1, 2, 3]);
1387
1388 assert!(set.has_pending_changes().await);
1390
1391 assert_eq!(set.keys().await?, vec![vec![1, 2, 3]]);
1393
1394 let mut batch = Batch::new();
1395 set.pre_save(&mut batch)?;
1396 set.context().store().write_batch(batch).await?;
1397 set.post_save();
1398
1399 assert!(!set.has_pending_changes().await);
1401
1402 assert_eq!(set.keys().await?, vec![vec![1, 2, 3]]);
1404
1405 assert!(set.contains(&[1, 2, 3]).await?);
1407
1408 set.remove(vec![1, 2, 3]);
1410
1411 assert!(set.has_pending_changes().await);
1413
1414 assert!(set.keys().await?.is_empty());
1416
1417 assert!(!set.contains(&[1, 2, 3]).await?);
1420
1421 Ok(())
1422 }
1423
1424 #[tokio::test]
1425 async fn test_contains_delete_storage_first_returns_false() -> Result<(), ViewError> {
1426 let context = MemoryContext::new_for_testing(());
1427 let mut set = ByteSetView::load(context).await?;
1428
1429 set.insert(vec![1]);
1431 set.insert(vec![2]);
1432 set.insert(vec![3]);
1433 let mut batch = Batch::new();
1434 set.pre_save(&mut batch)?;
1435 set.context().store().write_batch(batch).await?;
1436 set.post_save();
1437
1438 assert!(set.contains(&[1]).await?);
1440 assert!(set.contains(&[2]).await?);
1441 assert!(set.contains(&[3]).await?);
1442
1443 set.clear();
1445
1446 assert!(!set.contains(&[1]).await?);
1449 assert!(!set.contains(&[2]).await?);
1450 assert!(!set.contains(&[3]).await?);
1451
1452 assert!(!set.contains(&[99]).await?);
1454
1455 Ok(())
1456 }
1457
1458 #[tokio::test]
1459 async fn test_contains_delete_storage_first_with_new_additions() -> Result<(), ViewError> {
1460 let context = MemoryContext::new_for_testing(());
1461 let mut set = ByteSetView::load(context).await?;
1462
1463 set.insert(vec![1]);
1465 set.insert(vec![2]);
1466 let mut batch = Batch::new();
1467 set.pre_save(&mut batch)?;
1468 set.context().store().write_batch(batch).await?;
1469 set.post_save();
1470
1471 set.clear();
1473
1474 set.insert(vec![3]);
1476 set.insert(vec![4]);
1477
1478 assert!(!set.contains(&[1]).await?); assert!(!set.contains(&[2]).await?); assert!(set.contains(&[3]).await?); assert!(set.contains(&[4]).await?); Ok(())
1486 }
1487
1488 #[tokio::test]
1489 async fn test_for_each_key_while_update_set_processing_in_stored_loop() -> Result<(), ViewError>
1490 {
1491 let context = MemoryContext::new_for_testing(());
1492 let mut set = ByteSetView::load(context).await?;
1493
1494 set.insert(vec![2]);
1496 set.insert(vec![4]);
1497 set.insert(vec![6]);
1498 let mut batch = Batch::new();
1499 set.pre_save(&mut batch)?;
1500 set.context().store().write_batch(batch).await?;
1501 set.post_save();
1502
1503 set.insert(vec![1]); set.insert(vec![3]); set.remove(vec![5]); let mut processed_keys = Vec::new();
1509
1510 set.for_each_key_while(|key| {
1513 processed_keys.push(key.to_vec());
1514 Ok(true)
1515 })
1516 .await?;
1517
1518 assert_eq!(
1521 processed_keys,
1522 vec![vec![1], vec![2], vec![3], vec![4], vec![6]]
1523 );
1524
1525 Ok(())
1526 }
1527
1528 #[tokio::test]
1529 async fn test_set_view_flush_with_delete_storage_first_and_set_updates() -> Result<(), ViewError>
1530 {
1531 let context = MemoryContext::new_for_testing(());
1532 let mut set = SetView::<_, u32>::load(context).await?;
1533
1534 set.insert(&42)?;
1536 set.insert(&84)?;
1537 let mut batch = Batch::new();
1538 set.pre_save(&mut batch)?;
1539 set.context().store().write_batch(batch).await?;
1540 set.post_save();
1541
1542 set.clear();
1544
1545 set.insert(&123)?;
1547 set.insert(&456)?;
1548
1549 let mut batch = Batch::new();
1550 let delete_view = set.pre_save(&mut batch)?;
1551
1552 assert!(!delete_view);
1554
1555 set.context().store().write_batch(batch).await?;
1557 set.post_save();
1558 let new_set = SetView::<_, u32>::load(set.context().clone()).await?;
1559 assert!(new_set.contains(&123).await?);
1560 assert!(new_set.contains(&456).await?);
1561 assert!(!new_set.contains(&42).await?);
1562 assert!(!new_set.contains(&84).await?);
1563
1564 Ok(())
1565 }
1566
1567 #[tokio::test]
1568 async fn test_set_view_count_delegation() -> Result<(), ViewError> {
1569 let context = MemoryContext::new_for_testing(());
1570 let mut set = SetView::<_, u32>::load(context).await?;
1571
1572 assert!(!set.has_pending_changes().await);
1574
1575 assert_eq!(set.count().await?, 0);
1577
1578 set.insert(&42)?;
1580
1581 assert!(set.has_pending_changes().await);
1583
1584 assert_eq!(set.indices().await?, vec![42]);
1586
1587 set.insert(&84)?;
1588 set.insert(&126)?;
1589
1590 assert!(set.has_pending_changes().await);
1592
1593 assert_eq!(set.indices().await?, vec![42, 84, 126]);
1595
1596 assert_eq!(set.count().await?, 3);
1598
1599 Ok(())
1600 }
1601
1602 #[tokio::test]
1603 async fn test_set_view_hash_mut_delegation() -> Result<(), ViewError> {
1604 let context = MemoryContext::new_for_testing(());
1605 let mut set = SetView::<_, u32>::load(context).await?;
1606
1607 set.insert(&42)?;
1609 set.insert(&84)?;
1610 set.insert(&126)?;
1611
1612 let hash1 = set.hash_mut().await?;
1614 let hash2 = set.hash().await?;
1615
1616 assert_eq!(hash1, hash2);
1618
1619 set.insert(&168)?;
1621 let hash3 = set.hash_mut().await?;
1622 assert_ne!(hash1, hash3);
1623
1624 let context2 = MemoryContext::new_for_testing(());
1626 let mut byte_set = ByteSetView::load(context2).await?;
1627
1628 use crate::context::BaseKey;
1630 byte_set.insert(BaseKey::derive_short_key(&42u32)?);
1631 byte_set.insert(BaseKey::derive_short_key(&84u32)?);
1632 byte_set.insert(BaseKey::derive_short_key(&126u32)?);
1633 byte_set.insert(BaseKey::derive_short_key(&168u32)?);
1634
1635 let byte_set_hash = byte_set.hash_mut().await?;
1636 assert_eq!(hash3, byte_set_hash);
1637
1638 Ok(())
1639 }
1640
1641 #[tokio::test]
1643 async fn test_custom_set_view_flush_with_delete_storage_first_and_set_updates(
1644 ) -> Result<(), ViewError> {
1645 let context = MemoryContext::new_for_testing(());
1646 let mut set = CustomSetView::<_, u128>::load(context).await?;
1647
1648 assert!(!set.has_pending_changes().await);
1650
1651 set.insert(&42u128)?;
1653 set.insert(&84u128)?;
1654
1655 assert!(set.has_pending_changes().await);
1657
1658 assert_eq!(set.indices().await?, vec![42u128, 84u128]);
1660
1661 let mut batch = Batch::new();
1662 set.pre_save(&mut batch)?;
1663 set.context().store().write_batch(batch).await?;
1664 set.post_save();
1665
1666 assert!(!set.has_pending_changes().await);
1668
1669 assert_eq!(set.indices().await?, vec![42u128, 84u128]);
1671
1672 set.clear();
1674
1675 assert!(set.has_pending_changes().await);
1677
1678 assert!(set.indices().await?.is_empty());
1680
1681 set.insert(&123u128)?;
1683 set.insert(&456u128)?;
1684
1685 assert!(set.has_pending_changes().await);
1687
1688 assert_eq!(set.indices().await?, vec![123u128, 456u128]);
1690
1691 let mut batch = Batch::new();
1692 let delete_view = set.pre_save(&mut batch)?;
1693
1694 assert!(!delete_view);
1696
1697 set.context().store().write_batch(batch).await?;
1699 set.post_save();
1700
1701 assert!(!set.has_pending_changes().await);
1703
1704 let new_set = CustomSetView::<_, u128>::load(set.context().clone()).await?;
1705 assert!(new_set.contains(&123u128).await?);
1706 assert!(new_set.contains(&456u128).await?);
1707 assert!(!new_set.contains(&42u128).await?);
1708 assert!(!new_set.contains(&84u128).await?);
1709
1710 assert!(!new_set.has_pending_changes().await);
1712
1713 Ok(())
1714 }
1715
1716 #[tokio::test]
1717 async fn test_custom_set_view_contains_update_removed_returns_false() -> Result<(), ViewError> {
1718 let context = MemoryContext::new_for_testing(());
1719 let mut set = CustomSetView::<_, u128>::load(context).await?;
1720
1721 set.insert(&12345u128)?;
1723 let mut batch = Batch::new();
1724 set.pre_save(&mut batch)?;
1725 set.context().store().write_batch(batch).await?;
1726 set.post_save();
1727
1728 assert!(set.contains(&12345u128).await?);
1730
1731 set.remove(&12345u128)?;
1733
1734 assert!(!set.contains(&12345u128).await?);
1736
1737 Ok(())
1738 }
1739
1740 #[tokio::test]
1741 async fn test_custom_set_view_contains_delete_storage_first_returns_false(
1742 ) -> Result<(), ViewError> {
1743 let context = MemoryContext::new_for_testing(());
1744 let mut set = CustomSetView::<_, u128>::load(context).await?;
1745
1746 set.insert(&111u128)?;
1748 set.insert(&222u128)?;
1749 set.insert(&333u128)?;
1750 let mut batch = Batch::new();
1751 set.pre_save(&mut batch)?;
1752 set.context().store().write_batch(batch).await?;
1753 set.post_save();
1754
1755 assert!(set.contains(&111u128).await?);
1757 assert!(set.contains(&222u128).await?);
1758
1759 set.clear();
1761
1762 assert!(!set.contains(&111u128).await?);
1764 assert!(!set.contains(&222u128).await?);
1765 assert!(!set.contains(&333u128).await?);
1766
1767 Ok(())
1768 }
1769
1770 #[tokio::test]
1771 async fn test_custom_set_view_hash_mut_delegation() -> Result<(), ViewError> {
1772 let context = MemoryContext::new_for_testing(());
1773 let mut set = CustomSetView::<_, u128>::load(context).await?;
1774
1775 set.insert(&1000u128)?;
1777 set.insert(&2000u128)?;
1778
1779 let hash1 = set.hash_mut().await?;
1781 let hash2 = set.hash().await?;
1782
1783 assert_eq!(hash1, hash2);
1785
1786 set.insert(&3000u128)?;
1788 let hash3 = set.hash_mut().await?;
1789 assert_ne!(hash1, hash3);
1790
1791 Ok(())
1792 }
1793
1794 #[tokio::test]
1795 async fn test_custom_set_view_for_each_index_while_method_signature() -> Result<(), ViewError> {
1796 let context = MemoryContext::new_for_testing(());
1797 let mut set = CustomSetView::<_, u128>::load(context).await?;
1798 assert_eq!(set.count().await?, 0);
1799
1800 set.insert(&100u128)?;
1802 set.insert(&200u128)?;
1803 set.insert(&300u128)?;
1804
1805 assert_eq!(set.count().await?, 3);
1806
1807 let mut collected_indices = Vec::new();
1808
1809 set.for_each_index_while(|index| {
1812 collected_indices.push(index);
1813 Ok(true)
1814 })
1815 .await?;
1816
1817 assert_eq!(collected_indices, vec![100u128, 200u128, 300u128]);
1819
1820 Ok(())
1821 }
1822
1823 #[tokio::test]
1824 async fn test_custom_set_view_rollback() -> Result<(), ViewError> {
1825 let context = MemoryContext::new_for_testing(());
1826 let mut set = CustomSetView::<_, u128>::load(context).await?;
1827
1828 set.insert(&100u128)?;
1830 set.insert(&200u128)?;
1831 let mut batch = Batch::new();
1832 set.pre_save(&mut batch)?;
1833 set.context().store().write_batch(batch).await?;
1834 set.post_save();
1835
1836 assert!(set.contains(&100u128).await?);
1838 assert!(set.contains(&200u128).await?);
1839 assert!(!set.has_pending_changes().await);
1840
1841 set.insert(&300u128)?;
1843 set.remove(&100u128)?;
1844 assert!(set.has_pending_changes().await);
1845
1846 assert!(set.contains(&300u128).await?);
1848 assert!(!set.contains(&100u128).await?);
1849
1850 set.rollback();
1852
1853 assert!(!set.has_pending_changes().await);
1855 assert!(set.contains(&100u128).await?);
1856 assert!(set.contains(&200u128).await?);
1857 assert!(!set.contains(&300u128).await?);
1858
1859 Ok(())
1860 }
1861
1862 #[tokio::test]
1863 async fn test_custom_set_view_clone_unchecked() -> Result<(), ViewError> {
1864 let context = MemoryContext::new_for_testing(());
1865 let mut set = CustomSetView::<_, u128>::load(context).await?;
1866
1867 set.insert(&42u128)?;
1869 set.insert(&84u128)?;
1870
1871 let mut cloned_set = set.clone_unchecked()?;
1873
1874 assert!(cloned_set.contains(&42u128).await?);
1876 assert!(cloned_set.contains(&84u128).await?);
1877
1878 cloned_set.insert(&126u128)?;
1880 assert!(cloned_set.contains(&126u128).await?);
1881 assert!(!set.contains(&126u128).await?);
1882
1883 set.insert(&168u128)?;
1885 assert!(set.contains(&168u128).await?);
1886 assert!(!cloned_set.contains(&168u128).await?);
1887
1888 Ok(())
1889 }
1890
1891 #[cfg(with_graphql)]
1892 mod graphql_tests {
1893 use async_graphql::{EmptyMutation, EmptySubscription, Object, Schema};
1894
1895 use super::*;
1896
1897 struct Query;
1899
1900 #[Object]
1901 impl Query {
1902 async fn test_set(&self) -> TestSetView {
1903 let context = MemoryContext::new_for_testing(());
1904 let mut set = SetView::<_, u32>::load(context).await.unwrap();
1905
1906 set.insert(&42).unwrap();
1908 set.insert(&84).unwrap();
1909 set.insert(&126).unwrap();
1910 set.insert(&168).unwrap();
1911 set.insert(&210).unwrap();
1912
1913 TestSetView { set }
1914 }
1915 }
1916
1917 struct TestSetView {
1918 set: SetView<MemoryContext<()>, u32>,
1919 }
1920
1921 #[Object]
1922 impl TestSetView {
1923 async fn elements(
1924 &self,
1925 count: Option<usize>,
1926 ) -> Result<Vec<u32>, async_graphql::Error> {
1927 let mut indices = self.set.indices().await?;
1929 if let Some(count) = count {
1930 indices.truncate(count);
1932 }
1933 Ok(indices)
1934 }
1935
1936 async fn count(&self) -> Result<u32, async_graphql::Error> {
1937 Ok(self.set.count().await? as u32)
1938 }
1939 }
1940
1941 #[tokio::test]
1942 async fn test_graphql_elements_without_count() -> Result<(), Box<dyn std::error::Error>> {
1943 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
1944
1945 let query = r#"
1947 query {
1948 testSet {
1949 elements
1950 }
1951 }
1952 "#;
1953
1954 let result = schema.execute(query).await;
1955 assert!(result.errors.is_empty());
1956
1957 let data = result.data.into_json()?;
1958 let elements = &data["testSet"]["elements"];
1959 assert!(elements.is_array());
1960 assert_eq!(elements.as_array().unwrap().len(), 5);
1961
1962 Ok(())
1963 }
1964
1965 #[tokio::test]
1966 async fn test_graphql_elements_with_count() -> Result<(), Box<dyn std::error::Error>> {
1967 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
1968
1969 let query = r#"
1971 query {
1972 testSet {
1973 elements(count: 3)
1974 }
1975 }
1976 "#;
1977
1978 let result = schema.execute(query).await;
1979 assert!(result.errors.is_empty());
1980
1981 let data = result.data.into_json()?;
1982 let elements = &data["testSet"]["elements"];
1983 assert!(elements.is_array());
1984 assert_eq!(elements.as_array().unwrap().len(), 3);
1986
1987 Ok(())
1988 }
1989
1990 #[tokio::test]
1991 async fn test_graphql_count_field() -> Result<(), Box<dyn std::error::Error>> {
1992 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
1993
1994 let query = r#"
1995 query {
1996 testSet {
1997 count
1998 }
1999 }
2000 "#;
2001
2002 let result = schema.execute(query).await;
2003 assert!(result.errors.is_empty());
2004
2005 let data = result.data.into_json()?;
2006 let count = &data["testSet"]["count"];
2007 assert_eq!(count.as_u64().unwrap(), 5);
2008
2009 Ok(())
2010 }
2011 }
2012}