1use std::{collections::BTreeMap, fmt::Debug, ops::Bound::Included, sync::Mutex};
17
18use allocative::Allocative;
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{ensure, visit_allocative_simple};
22
23use crate::{
24 batch::{Batch, WriteOperation},
25 common::{
26 from_bytes_option, get_key_range_for_prefix, get_upper_bound, DeletionSet, HasherOutput,
27 SuffixClosedSetIterator, Update,
28 },
29 context::Context,
30 hashable_wrapper::WrappedHashableContainerView,
31 historical_hash_wrapper::HistoricallyHashableView,
32 store::ReadableKeyValueStore,
33 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
34};
35
36#[cfg(with_metrics)]
37mod metrics {
38 use std::sync::LazyLock;
39
40 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
41 use prometheus::HistogramVec;
42
43 pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
45 register_histogram_vec(
46 "key_value_store_view_hash_latency",
47 "KeyValueStoreView hash latency",
48 &[],
49 exponential_bucket_latencies(5.0),
50 )
51 });
52
53 pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
55 register_histogram_vec(
56 "key_value_store_view_get_latency",
57 "KeyValueStoreView get latency",
58 &[],
59 exponential_bucket_latencies(5.0),
60 )
61 });
62
63 pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
65 LazyLock::new(|| {
66 register_histogram_vec(
67 "key_value_store_view_multi_get_latency",
68 "KeyValueStoreView multi get latency",
69 &[],
70 exponential_bucket_latencies(5.0),
71 )
72 });
73
74 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
76 LazyLock::new(|| {
77 register_histogram_vec(
78 "key_value_store_view_contains_key_latency",
79 "KeyValueStoreView contains key latency",
80 &[],
81 exponential_bucket_latencies(5.0),
82 )
83 });
84
85 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
87 LazyLock::new(|| {
88 register_histogram_vec(
89 "key_value_store_view_contains_keys_latency",
90 "KeyValueStoreView contains keys latency",
91 &[],
92 exponential_bucket_latencies(5.0),
93 )
94 });
95
96 pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
98 LazyLock::new(|| {
99 register_histogram_vec(
100 "key_value_store_view_find_keys_by_prefix_latency",
101 "KeyValueStoreView find keys by prefix latency",
102 &[],
103 exponential_bucket_latencies(5.0),
104 )
105 });
106
107 pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
109 LazyLock::new(|| {
110 register_histogram_vec(
111 "key_value_store_view_find_key_values_by_prefix_latency",
112 "KeyValueStoreView find key values by prefix latency",
113 &[],
114 exponential_bucket_latencies(5.0),
115 )
116 });
117
118 pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
120 LazyLock::new(|| {
121 register_histogram_vec(
122 "key_value_store_view_write_batch_latency",
123 "KeyValueStoreView write batch latency",
124 &[],
125 exponential_bucket_latencies(5.0),
126 )
127 });
128}
129
130#[cfg(with_testing)]
131use {
132 crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
133 async_lock::RwLock,
134 std::sync::Arc,
135 thiserror::Error,
136};
137
138#[repr(u8)]
139enum KeyTag {
140 Index = MIN_VIEW_TAG,
142 Hash,
144}
145
146#[derive(Debug, Allocative)]
164#[allocative(bound = "C")]
165pub struct KeyValueStoreView<C> {
166 #[allocative(skip)]
168 context: C,
169 deletion_set: DeletionSet,
171 updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
173 #[allocative(visit = visit_allocative_simple)]
175 stored_hash: Option<HasherOutput>,
176 #[allocative(visit = visit_allocative_simple)]
178 hash: Mutex<Option<HasherOutput>>,
179}
180
181impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
182 type Target = KeyValueStoreView<C2>;
183
184 async fn with_context(
185 &mut self,
186 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
187 ) -> Self::Target {
188 let hash = *self.hash.lock().unwrap();
189 KeyValueStoreView {
190 context: ctx.clone()(&self.context),
191 deletion_set: self.deletion_set.clone(),
192 updates: self.updates.clone(),
193 stored_hash: self.stored_hash,
194 hash: Mutex::new(hash),
195 }
196 }
197}
198
199impl<C: Context> View for KeyValueStoreView<C> {
200 const NUM_INIT_KEYS: usize = 1;
201
202 type Context = C;
203
204 fn context(&self) -> C {
205 self.context.clone()
206 }
207
208 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
209 let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
210 Ok(vec![key_hash])
211 }
212
213 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
214 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
215 Ok(Self {
216 context,
217 deletion_set: DeletionSet::new(),
218 updates: BTreeMap::new(),
219 stored_hash: hash,
220 hash: Mutex::new(hash),
221 })
222 }
223
224 fn rollback(&mut self) {
225 self.deletion_set.rollback();
226 self.updates.clear();
227 *self.hash.get_mut().unwrap() = self.stored_hash;
228 }
229
230 async fn has_pending_changes(&self) -> bool {
231 if self.deletion_set.has_pending_changes() {
232 return true;
233 }
234 if !self.updates.is_empty() {
235 return true;
236 }
237 let hash = self.hash.lock().unwrap();
238 self.stored_hash != *hash
239 }
240
241 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
242 let mut delete_view = false;
243 if self.deletion_set.delete_storage_first {
244 delete_view = true;
245 batch.delete_key_prefix(self.context.base_key().bytes.clone());
246 for (index, update) in self.updates.iter() {
247 if let Update::Set(value) = update {
248 let key = self
249 .context
250 .base_key()
251 .base_tag_index(KeyTag::Index as u8, index);
252 batch.put_key_value_bytes(key, value.clone());
253 delete_view = false;
254 }
255 }
256 } else {
257 for index in self.deletion_set.deleted_prefixes.iter() {
258 let key = self
259 .context
260 .base_key()
261 .base_tag_index(KeyTag::Index as u8, index);
262 batch.delete_key_prefix(key);
263 }
264 for (index, update) in self.updates.iter() {
265 let key = self
266 .context
267 .base_key()
268 .base_tag_index(KeyTag::Index as u8, index);
269 match update {
270 Update::Removed => batch.delete_key(key),
271 Update::Set(value) => batch.put_key_value_bytes(key, value.clone()),
272 }
273 }
274 }
275 let hash = *self.hash.lock().unwrap();
276 if self.stored_hash != hash {
277 let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
278 match hash {
279 None => batch.delete_key(key),
280 Some(hash) => batch.put_key_value(key, &hash)?,
281 }
282 }
283 Ok(delete_view)
284 }
285
286 fn post_save(&mut self) {
287 self.deletion_set.delete_storage_first = false;
288 self.deletion_set.deleted_prefixes.clear();
289 self.updates.clear();
290 let hash = *self.hash.lock().unwrap();
291 self.stored_hash = hash;
292 }
293
294 fn clear(&mut self) {
295 self.deletion_set.clear();
296 self.updates.clear();
297 *self.hash.get_mut().unwrap() = None;
298 }
299}
300
301impl<C: Context> ClonableView for KeyValueStoreView<C> {
302 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
303 Ok(KeyValueStoreView {
304 context: self.context.clone(),
305 deletion_set: self.deletion_set.clone(),
306 updates: self.updates.clone(),
307 stored_hash: self.stored_hash,
308 hash: Mutex::new(*self.hash.get_mut().unwrap()),
309 })
310 }
311}
312
313impl<C: Context> KeyValueStoreView<C> {
314 fn max_key_size(&self) -> usize {
315 let prefix_len = self.context.base_key().bytes.len();
316 <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
317 }
318
319 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
342 where
343 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
344 {
345 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
346 let mut updates = self.updates.iter();
347 let mut update = updates.next();
348 if !self.deletion_set.delete_storage_first {
349 let mut suffix_closed_set =
350 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
351 for index in self
352 .context
353 .store()
354 .find_keys_by_prefix(&key_prefix)
355 .await?
356 {
357 loop {
358 match update {
359 Some((key, value)) if key <= &index => {
360 if let Update::Set(_) = value {
361 if !f(key)? {
362 return Ok(());
363 }
364 }
365 update = updates.next();
366 if key == &index {
367 break;
368 }
369 }
370 _ => {
371 if !suffix_closed_set.find_key(&index) && !f(&index)? {
372 return Ok(());
373 }
374 break;
375 }
376 }
377 }
378 }
379 }
380 while let Some((key, value)) = update {
381 if let Update::Set(_) = value {
382 if !f(key)? {
383 return Ok(());
384 }
385 }
386 update = updates.next();
387 }
388 Ok(())
389 }
390
391 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
413 where
414 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
415 {
416 self.for_each_index_while(|key| {
417 f(key)?;
418 Ok(true)
419 })
420 .await
421 }
422
423 pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
445 where
446 F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
447 {
448 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
449 let mut updates = self.updates.iter();
450 let mut update = updates.next();
451 if !self.deletion_set.delete_storage_first {
452 let mut suffix_closed_set =
453 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
454 for entry in self
455 .context
456 .store()
457 .find_key_values_by_prefix(&key_prefix)
458 .await?
459 {
460 let (index, index_val) = entry;
461 loop {
462 match update {
463 Some((key, value)) if key <= &index => {
464 if let Update::Set(value) = value {
465 if !f(key, value)? {
466 return Ok(());
467 }
468 }
469 update = updates.next();
470 if key == &index {
471 break;
472 }
473 }
474 _ => {
475 if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
476 return Ok(());
477 }
478 break;
479 }
480 }
481 }
482 }
483 }
484 while let Some((key, value)) = update {
485 if let Update::Set(value) = value {
486 if !f(key, value)? {
487 return Ok(());
488 }
489 }
490 update = updates.next();
491 }
492 Ok(())
493 }
494
495 pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
516 where
517 F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
518 {
519 self.for_each_index_value_while(|key, value| {
520 f(key, value)?;
521 Ok(true)
522 })
523 .await
524 }
525
526 pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
541 let mut indices = Vec::new();
542 self.for_each_index(|index| {
543 indices.push(index.to_vec());
544 Ok(())
545 })
546 .await?;
547 Ok(indices)
548 }
549
550 pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
565 let mut index_values = Vec::new();
566 self.for_each_index_value(|index, value| {
567 index_values.push((index.to_vec(), value.to_vec()));
568 Ok(())
569 })
570 .await?;
571 Ok(index_values)
572 }
573
574 pub async fn count(&self) -> Result<usize, ViewError> {
589 let mut count = 0;
590 self.for_each_index(|_index| {
591 count += 1;
592 Ok(())
593 })
594 .await?;
595 Ok(count)
596 }
597
598 pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
612 #[cfg(with_metrics)]
613 let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
614 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
615 if let Some(update) = self.updates.get(index) {
616 let value = match update {
617 Update::Removed => None,
618 Update::Set(value) => Some(value.clone()),
619 };
620 return Ok(value);
621 }
622 if self.deletion_set.contains_prefix_of(index) {
623 return Ok(None);
624 }
625 let key = self
626 .context
627 .base_key()
628 .base_tag_index(KeyTag::Index as u8, index);
629 Ok(self.context.store().read_value_bytes(&key).await?)
630 }
631
632 pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
646 #[cfg(with_metrics)]
647 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
648 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
649 if let Some(update) = self.updates.get(index) {
650 let test = match update {
651 Update::Removed => false,
652 Update::Set(_value) => true,
653 };
654 return Ok(test);
655 }
656 if self.deletion_set.contains_prefix_of(index) {
657 return Ok(false);
658 }
659 let key = self
660 .context
661 .base_key()
662 .base_tag_index(KeyTag::Index as u8, index);
663 Ok(self.context.store().contains_key(&key).await?)
664 }
665
666 pub async fn contains_keys(&self, indices: &[Vec<u8>]) -> Result<Vec<bool>, ViewError> {
681 #[cfg(with_metrics)]
682 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
683 let mut results = Vec::with_capacity(indices.len());
684 let mut missed_indices = Vec::new();
685 let mut vector_query = Vec::new();
686 for (i, index) in indices.iter().enumerate() {
687 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
688 if let Some(update) = self.updates.get(index) {
689 let value = match update {
690 Update::Removed => false,
691 Update::Set(_) => true,
692 };
693 results.push(value);
694 } else {
695 results.push(false);
696 if !self.deletion_set.contains_prefix_of(index) {
697 missed_indices.push(i);
698 let key = self
699 .context
700 .base_key()
701 .base_tag_index(KeyTag::Index as u8, index);
702 vector_query.push(key);
703 }
704 }
705 }
706 let values = self.context.store().contains_keys(&vector_query).await?;
707 for (i, value) in missed_indices.into_iter().zip(values) {
708 results[i] = value;
709 }
710 Ok(results)
711 }
712
713 pub async fn multi_get(&self, indices: &[Vec<u8>]) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
729 #[cfg(with_metrics)]
730 let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
731 let mut result = Vec::with_capacity(indices.len());
732 let mut missed_indices = Vec::new();
733 let mut vector_query = Vec::new();
734 for (i, index) in indices.iter().enumerate() {
735 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
736 if let Some(update) = self.updates.get(index) {
737 let value = match update {
738 Update::Removed => None,
739 Update::Set(value) => Some(value.clone()),
740 };
741 result.push(value);
742 } else {
743 result.push(None);
744 if !self.deletion_set.contains_prefix_of(index) {
745 missed_indices.push(i);
746 let key = self
747 .context
748 .base_key()
749 .base_tag_index(KeyTag::Index as u8, index);
750 vector_query.push(key);
751 }
752 }
753 }
754 let values = self
755 .context
756 .store()
757 .read_multi_values_bytes(&vector_query)
758 .await?;
759 for (i, value) in missed_indices.into_iter().zip(values) {
760 result[i] = value;
761 }
762 Ok(result)
763 }
764
765 pub fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
784 #[cfg(with_metrics)]
785 let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
786 *self.hash.get_mut().unwrap() = None;
787 let max_key_size = self.max_key_size();
788 for operation in batch.operations {
789 match operation {
790 WriteOperation::Delete { key } => {
791 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
792 if self.deletion_set.contains_prefix_of(&key) {
793 self.updates.remove(&key);
795 } else {
796 self.updates.insert(key, Update::Removed);
797 }
798 }
799 WriteOperation::Put { key, value } => {
800 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
801 self.updates.insert(key, Update::Set(value));
802 }
803 WriteOperation::DeletePrefix { key_prefix } => {
804 ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
805 let key_list = self
806 .updates
807 .range(get_key_range_for_prefix(key_prefix.clone()))
808 .map(|x| x.0.to_vec())
809 .collect::<Vec<_>>();
810 for key in key_list {
811 self.updates.remove(&key);
812 }
813 self.deletion_set.insert_key_prefix(key_prefix);
814 }
815 }
816 }
817 Ok(())
818 }
819
820 pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
833 let mut batch = Batch::new();
834 batch.put_key_value_bytes(index, value);
835 self.write_batch(batch)
836 }
837
838 pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
852 let mut batch = Batch::new();
853 batch.delete_key(index);
854 self.write_batch(batch)
855 }
856
857 pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
871 let mut batch = Batch::new();
872 batch.delete_key_prefix(key_prefix);
873 self.write_batch(batch)
874 }
875
876 pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
891 #[cfg(with_metrics)]
892 let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
893 ensure!(
894 key_prefix.len() <= self.max_key_size(),
895 ViewError::KeyTooLong
896 );
897 let len = key_prefix.len();
898 let key_prefix_full = self
899 .context
900 .base_key()
901 .base_tag_index(KeyTag::Index as u8, key_prefix);
902 let mut keys = Vec::new();
903 let key_prefix_upper = get_upper_bound(key_prefix);
904 let mut updates = self
905 .updates
906 .range((Included(key_prefix.to_vec()), key_prefix_upper));
907 let mut update = updates.next();
908 if !self.deletion_set.delete_storage_first {
909 let mut suffix_closed_set =
910 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
911 for key in self
912 .context
913 .store()
914 .find_keys_by_prefix(&key_prefix_full)
915 .await?
916 {
917 loop {
918 match update {
919 Some((update_key, update_value))
920 if &update_key[len..] <= key.as_slice() =>
921 {
922 if let Update::Set(_) = update_value {
923 keys.push(update_key[len..].to_vec());
924 }
925 update = updates.next();
926 if update_key[len..] == key[..] {
927 break;
928 }
929 }
930 _ => {
931 let mut key_with_prefix = key_prefix.to_vec();
932 key_with_prefix.extend_from_slice(&key);
933 if !suffix_closed_set.find_key(&key_with_prefix) {
934 keys.push(key);
935 }
936 break;
937 }
938 }
939 }
940 }
941 }
942 while let Some((update_key, update_value)) = update {
943 if let Update::Set(_) = update_value {
944 let update_key = update_key[len..].to_vec();
945 keys.push(update_key);
946 }
947 update = updates.next();
948 }
949 Ok(keys)
950 }
951
952 pub async fn find_key_values_by_prefix(
968 &self,
969 key_prefix: &[u8],
970 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
971 #[cfg(with_metrics)]
972 let _latency =
973 metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
974 ensure!(
975 key_prefix.len() <= self.max_key_size(),
976 ViewError::KeyTooLong
977 );
978 let len = key_prefix.len();
979 let key_prefix_full = self
980 .context
981 .base_key()
982 .base_tag_index(KeyTag::Index as u8, key_prefix);
983 let mut key_values = Vec::new();
984 let key_prefix_upper = get_upper_bound(key_prefix);
985 let mut updates = self
986 .updates
987 .range((Included(key_prefix.to_vec()), key_prefix_upper));
988 let mut update = updates.next();
989 if !self.deletion_set.delete_storage_first {
990 let mut suffix_closed_set =
991 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
992 for entry in self
993 .context
994 .store()
995 .find_key_values_by_prefix(&key_prefix_full)
996 .await?
997 {
998 let (key, value) = entry;
999 loop {
1000 match update {
1001 Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1002 if let Update::Set(update_value) = update_value {
1003 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1004 key_values.push(key_value);
1005 }
1006 update = updates.next();
1007 if update_key[len..] == key[..] {
1008 break;
1009 }
1010 }
1011 _ => {
1012 let mut key_with_prefix = key_prefix.to_vec();
1013 key_with_prefix.extend_from_slice(&key);
1014 if !suffix_closed_set.find_key(&key_with_prefix) {
1015 key_values.push((key, value));
1016 }
1017 break;
1018 }
1019 }
1020 }
1021 }
1022 }
1023 while let Some((update_key, update_value)) = update {
1024 if let Update::Set(update_value) = update_value {
1025 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1026 key_values.push(key_value);
1027 }
1028 update = updates.next();
1029 }
1030 Ok(key_values)
1031 }
1032
1033 async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1034 #[cfg(with_metrics)]
1035 let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1036 let mut hasher = sha3::Sha3_256::default();
1037 let mut count = 0u32;
1038 self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1039 count += 1;
1040 hasher.update_with_bytes(index)?;
1041 hasher.update_with_bytes(value)?;
1042 Ok(())
1043 })
1044 .await?;
1045 hasher.update_with_bcs_bytes(&count)?;
1046 Ok(hasher.finalize())
1047 }
1048}
1049
1050impl<C: Context> HashableView for KeyValueStoreView<C> {
1051 type Hasher = sha3::Sha3_256;
1052
1053 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1054 let hash = *self.hash.get_mut().unwrap();
1055 match hash {
1056 Some(hash) => Ok(hash),
1057 None => {
1058 let new_hash = self.compute_hash().await?;
1059 let hash = self.hash.get_mut().unwrap();
1060 *hash = Some(new_hash);
1061 Ok(new_hash)
1062 }
1063 }
1064 }
1065
1066 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1067 let hash = *self.hash.lock().unwrap();
1068 match hash {
1069 Some(hash) => Ok(hash),
1070 None => {
1071 let new_hash = self.compute_hash().await?;
1072 let mut hash = self.hash.lock().unwrap();
1073 *hash = Some(new_hash);
1074 Ok(new_hash)
1075 }
1076 }
1077 }
1078}
1079
1080pub type HashedKeyValueStoreView<C> =
1082 WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1083
1084pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1086
1087#[cfg(with_testing)]
1089#[derive(Debug, Clone)]
1090pub struct ViewContainer<C> {
1091 view: Arc<RwLock<KeyValueStoreView<C>>>,
1092}
1093
1094#[cfg(with_testing)]
1095impl<C> WithError for ViewContainer<C> {
1096 type Error = ViewContainerError;
1097}
1098
1099#[cfg(with_testing)]
1100#[derive(Error, Debug)]
1102pub enum ViewContainerError {
1103 #[error(transparent)]
1105 ViewError(#[from] ViewError),
1106
1107 #[error(transparent)]
1109 BcsError(#[from] bcs::Error),
1110}
1111
1112#[cfg(with_testing)]
1113impl KeyValueStoreError for ViewContainerError {
1114 const BACKEND: &'static str = "view_container";
1115}
1116
1117#[cfg(with_testing)]
1118impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1119 const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1120
1121 fn max_stream_queries(&self) -> usize {
1122 1
1123 }
1124
1125 fn root_key(&self) -> Result<Vec<u8>, ViewContainerError> {
1126 Ok(Vec::new())
1127 }
1128
1129 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1130 let view = self.view.read().await;
1131 Ok(view.get(key).await?)
1132 }
1133
1134 async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1135 let view = self.view.read().await;
1136 Ok(view.contains_key(key).await?)
1137 }
1138
1139 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, ViewContainerError> {
1140 let view = self.view.read().await;
1141 Ok(view.contains_keys(keys).await?)
1142 }
1143
1144 async fn read_multi_values_bytes(
1145 &self,
1146 keys: &[Vec<u8>],
1147 ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1148 let view = self.view.read().await;
1149 Ok(view.multi_get(keys).await?)
1150 }
1151
1152 async fn find_keys_by_prefix(
1153 &self,
1154 key_prefix: &[u8],
1155 ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1156 let view = self.view.read().await;
1157 Ok(view.find_keys_by_prefix(key_prefix).await?)
1158 }
1159
1160 async fn find_key_values_by_prefix(
1161 &self,
1162 key_prefix: &[u8],
1163 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1164 let view = self.view.read().await;
1165 Ok(view.find_key_values_by_prefix(key_prefix).await?)
1166 }
1167}
1168
1169#[cfg(with_testing)]
1170impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1171 const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1172
1173 async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1174 let mut view = self.view.write().await;
1175 view.write_batch(batch)?;
1176 let mut batch = Batch::new();
1177 view.pre_save(&mut batch)?;
1178 view.post_save();
1179 view.context()
1180 .store()
1181 .write_batch(batch)
1182 .await
1183 .map_err(ViewError::from)?;
1184 Ok(())
1185 }
1186
1187 async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1188 Ok(())
1189 }
1190}
1191
1192#[cfg(with_testing)]
1193impl<C: Context> ViewContainer<C> {
1194 pub async fn new(context: C) -> Result<Self, ViewError> {
1196 let view = KeyValueStoreView::load(context).await?;
1197 let view = Arc::new(RwLock::new(view));
1198 Ok(Self { view })
1199 }
1200}