1use std::{collections::BTreeMap, fmt::Debug, ops::Bound::Included, sync::Mutex};
17
18use allocative::Allocative;
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{data_types::ArithmeticError, ensure, visit_allocative_simple};
22use serde::{Deserialize, Serialize};
23
24use crate::{
25 batch::{Batch, WriteOperation},
26 common::{
27 from_bytes_option, from_bytes_option_or_default, get_key_range_for_prefix, get_upper_bound,
28 DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
29 },
30 context::Context,
31 hashable_wrapper::WrappedHashableContainerView,
32 historical_hash_wrapper::HistoricallyHashableView,
33 map_view::ByteMapView,
34 store::ReadableKeyValueStore,
35 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
36};
37
38#[cfg(with_metrics)]
39mod metrics {
40 use std::sync::LazyLock;
41
42 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
43 use prometheus::HistogramVec;
44
45 pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
47 register_histogram_vec(
48 "key_value_store_view_hash_latency",
49 "KeyValueStoreView hash latency",
50 &[],
51 exponential_bucket_latencies(5.0),
52 )
53 });
54
55 pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57 register_histogram_vec(
58 "key_value_store_view_get_latency",
59 "KeyValueStoreView get latency",
60 &[],
61 exponential_bucket_latencies(5.0),
62 )
63 });
64
65 pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
67 LazyLock::new(|| {
68 register_histogram_vec(
69 "key_value_store_view_multi_get_latency",
70 "KeyValueStoreView multi get latency",
71 &[],
72 exponential_bucket_latencies(5.0),
73 )
74 });
75
76 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
78 LazyLock::new(|| {
79 register_histogram_vec(
80 "key_value_store_view_contains_key_latency",
81 "KeyValueStoreView contains key latency",
82 &[],
83 exponential_bucket_latencies(5.0),
84 )
85 });
86
87 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
89 LazyLock::new(|| {
90 register_histogram_vec(
91 "key_value_store_view_contains_keys_latency",
92 "KeyValueStoreView contains keys latency",
93 &[],
94 exponential_bucket_latencies(5.0),
95 )
96 });
97
98 pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
100 LazyLock::new(|| {
101 register_histogram_vec(
102 "key_value_store_view_find_keys_by_prefix_latency",
103 "KeyValueStoreView find keys by prefix latency",
104 &[],
105 exponential_bucket_latencies(5.0),
106 )
107 });
108
109 pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
111 LazyLock::new(|| {
112 register_histogram_vec(
113 "key_value_store_view_find_key_values_by_prefix_latency",
114 "KeyValueStoreView find key values by prefix latency",
115 &[],
116 exponential_bucket_latencies(5.0),
117 )
118 });
119
120 pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
122 LazyLock::new(|| {
123 register_histogram_vec(
124 "key_value_store_view_write_batch_latency",
125 "KeyValueStoreView write batch latency",
126 &[],
127 exponential_bucket_latencies(5.0),
128 )
129 });
130}
131
132#[cfg(with_testing)]
133use {
134 crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
135 async_lock::RwLock,
136 std::sync::Arc,
137 thiserror::Error,
138};
139
140#[repr(u8)]
141enum KeyTag {
142 Index = MIN_VIEW_TAG,
144 TotalSize,
146 Sizes,
148 Hash,
150}
151
152#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Allocative)]
154pub struct SizeData {
155 pub key: u32,
157 pub value: u32,
159}
160
161impl SizeData {
162 pub fn sum(&mut self) -> u32 {
164 self.key + self.value
165 }
166
167 pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
169 self.key = self
170 .key
171 .checked_add(size.key)
172 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
173 self.value = self
174 .value
175 .checked_add(size.value)
176 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
177 Ok(())
178 }
179
180 pub fn sub_assign(&mut self, size: SizeData) {
182 self.key -= size.key;
183 self.value -= size.value;
184 }
185}
186
187#[derive(Debug, Allocative)]
205#[allocative(bound = "C")]
206pub struct KeyValueStoreView<C> {
207 #[allocative(skip)]
209 context: C,
210 deletion_set: DeletionSet,
212 updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
214 stored_total_size: SizeData,
216 total_size: SizeData,
218 sizes: ByteMapView<C, u32>,
220 #[allocative(visit = visit_allocative_simple)]
222 stored_hash: Option<HasherOutput>,
223 #[allocative(visit = visit_allocative_simple)]
225 hash: Mutex<Option<HasherOutput>>,
226}
227
228impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
229 type Target = KeyValueStoreView<C2>;
230
231 async fn with_context(
232 &mut self,
233 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
234 ) -> Self::Target {
235 let hash = *self.hash.lock().unwrap();
236 KeyValueStoreView {
237 context: ctx.clone()(&self.context),
238 deletion_set: self.deletion_set.clone(),
239 updates: self.updates.clone(),
240 stored_total_size: self.stored_total_size,
241 total_size: self.total_size,
242 sizes: self.sizes.with_context(ctx.clone()).await,
243 stored_hash: self.stored_hash,
244 hash: Mutex::new(hash),
245 }
246 }
247}
248
249impl<C: Context> View for KeyValueStoreView<C> {
250 const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
251
252 type Context = C;
253
254 fn context(&self) -> C {
255 self.context.clone()
256 }
257
258 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
259 let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
260 let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
261 let mut v = vec![key_hash, key_total_size];
262 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
263 let context_sizes = context.clone_with_base_key(base_key);
264 v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
265 Ok(v)
266 }
267
268 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
269 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
270 let total_size =
271 from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
272 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
273 let context_sizes = context.clone_with_base_key(base_key);
274 let sizes = ByteMapView::post_load(
275 context_sizes,
276 values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
277 )?;
278 Ok(Self {
279 context,
280 deletion_set: DeletionSet::new(),
281 updates: BTreeMap::new(),
282 stored_total_size: total_size,
283 total_size,
284 sizes,
285 stored_hash: hash,
286 hash: Mutex::new(hash),
287 })
288 }
289
290 fn rollback(&mut self) {
291 self.deletion_set.rollback();
292 self.updates.clear();
293 self.total_size = self.stored_total_size;
294 self.sizes.rollback();
295 *self.hash.get_mut().unwrap() = self.stored_hash;
296 }
297
298 async fn has_pending_changes(&self) -> bool {
299 if self.deletion_set.has_pending_changes() {
300 return true;
301 }
302 if !self.updates.is_empty() {
303 return true;
304 }
305 if self.stored_total_size != self.total_size {
306 return true;
307 }
308 if self.sizes.has_pending_changes().await {
309 return true;
310 }
311 let hash = self.hash.lock().unwrap();
312 self.stored_hash != *hash
313 }
314
315 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
316 let mut delete_view = false;
317 if self.deletion_set.delete_storage_first {
318 delete_view = true;
319 batch.delete_key_prefix(self.context.base_key().bytes.clone());
320 for (index, update) in self.updates.iter() {
321 if let Update::Set(value) = update {
322 let key = self
323 .context
324 .base_key()
325 .base_tag_index(KeyTag::Index as u8, index);
326 batch.put_key_value_bytes(key, value.clone());
327 delete_view = false;
328 }
329 }
330 } else {
331 for index in self.deletion_set.deleted_prefixes.iter() {
332 let key = self
333 .context
334 .base_key()
335 .base_tag_index(KeyTag::Index as u8, index);
336 batch.delete_key_prefix(key);
337 }
338 for (index, update) in self.updates.iter() {
339 let key = self
340 .context
341 .base_key()
342 .base_tag_index(KeyTag::Index as u8, index);
343 match update {
344 Update::Removed => batch.delete_key(key),
345 Update::Set(value) => batch.put_key_value_bytes(key, value.clone()),
346 }
347 }
348 }
349 self.sizes.pre_save(batch)?;
350 let hash = *self.hash.lock().unwrap();
351 if self.stored_hash != hash {
352 let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
353 match hash {
354 None => batch.delete_key(key),
355 Some(hash) => batch.put_key_value(key, &hash)?,
356 }
357 }
358 if self.stored_total_size != self.total_size {
359 let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
360 batch.put_key_value(key, &self.total_size)?;
361 }
362 Ok(delete_view)
363 }
364
365 fn post_save(&mut self) {
366 self.deletion_set.delete_storage_first = false;
367 self.deletion_set.deleted_prefixes.clear();
368 self.updates.clear();
369 self.sizes.post_save();
370 let hash = *self.hash.lock().unwrap();
371 self.stored_hash = hash;
372 self.stored_total_size = self.total_size;
373 }
374
375 fn clear(&mut self) {
376 self.deletion_set.clear();
377 self.updates.clear();
378 self.total_size = SizeData::default();
379 self.sizes.clear();
380 *self.hash.get_mut().unwrap() = None;
381 }
382}
383
384impl<C: Context> ClonableView for KeyValueStoreView<C> {
385 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
386 Ok(KeyValueStoreView {
387 context: self.context.clone(),
388 deletion_set: self.deletion_set.clone(),
389 updates: self.updates.clone(),
390 stored_total_size: self.stored_total_size,
391 total_size: self.total_size,
392 sizes: self.sizes.clone_unchecked()?,
393 stored_hash: self.stored_hash,
394 hash: Mutex::new(*self.hash.get_mut().unwrap()),
395 })
396 }
397}
398
399impl<C: Context> KeyValueStoreView<C> {
400 fn max_key_size(&self) -> usize {
401 let prefix_len = self.context.base_key().bytes.len();
402 <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
403 }
404
405 pub fn total_size(&self) -> SizeData {
419 self.total_size
420 }
421
422 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
445 where
446 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
447 {
448 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
449 let mut updates = self.updates.iter();
450 let mut update = updates.next();
451 if !self.deletion_set.delete_storage_first {
452 let mut suffix_closed_set =
453 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
454 for index in self
455 .context
456 .store()
457 .find_keys_by_prefix(&key_prefix)
458 .await?
459 {
460 loop {
461 match update {
462 Some((key, value)) if key <= &index => {
463 if let Update::Set(_) = value {
464 if !f(key)? {
465 return Ok(());
466 }
467 }
468 update = updates.next();
469 if key == &index {
470 break;
471 }
472 }
473 _ => {
474 if !suffix_closed_set.find_key(&index) && !f(&index)? {
475 return Ok(());
476 }
477 break;
478 }
479 }
480 }
481 }
482 }
483 while let Some((key, value)) = update {
484 if let Update::Set(_) = value {
485 if !f(key)? {
486 return Ok(());
487 }
488 }
489 update = updates.next();
490 }
491 Ok(())
492 }
493
494 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
516 where
517 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
518 {
519 self.for_each_index_while(|key| {
520 f(key)?;
521 Ok(true)
522 })
523 .await
524 }
525
526 pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
548 where
549 F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
550 {
551 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
552 let mut updates = self.updates.iter();
553 let mut update = updates.next();
554 if !self.deletion_set.delete_storage_first {
555 let mut suffix_closed_set =
556 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
557 for entry in self
558 .context
559 .store()
560 .find_key_values_by_prefix(&key_prefix)
561 .await?
562 {
563 let (index, index_val) = entry;
564 loop {
565 match update {
566 Some((key, value)) if key <= &index => {
567 if let Update::Set(value) = value {
568 if !f(key, value)? {
569 return Ok(());
570 }
571 }
572 update = updates.next();
573 if key == &index {
574 break;
575 }
576 }
577 _ => {
578 if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
579 return Ok(());
580 }
581 break;
582 }
583 }
584 }
585 }
586 }
587 while let Some((key, value)) = update {
588 if let Update::Set(value) = value {
589 if !f(key, value)? {
590 return Ok(());
591 }
592 }
593 update = updates.next();
594 }
595 Ok(())
596 }
597
598 pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
619 where
620 F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
621 {
622 self.for_each_index_value_while(|key, value| {
623 f(key, value)?;
624 Ok(true)
625 })
626 .await
627 }
628
629 pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
644 let mut indices = Vec::new();
645 self.for_each_index(|index| {
646 indices.push(index.to_vec());
647 Ok(())
648 })
649 .await?;
650 Ok(indices)
651 }
652
653 pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
668 let mut index_values = Vec::new();
669 self.for_each_index_value(|index, value| {
670 index_values.push((index.to_vec(), value.to_vec()));
671 Ok(())
672 })
673 .await?;
674 Ok(index_values)
675 }
676
677 pub async fn count(&self) -> Result<usize, ViewError> {
692 let mut count = 0;
693 self.for_each_index(|_index| {
694 count += 1;
695 Ok(())
696 })
697 .await?;
698 Ok(count)
699 }
700
701 pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
715 #[cfg(with_metrics)]
716 let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
717 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
718 if let Some(update) = self.updates.get(index) {
719 let value = match update {
720 Update::Removed => None,
721 Update::Set(value) => Some(value.clone()),
722 };
723 return Ok(value);
724 }
725 if self.deletion_set.contains_prefix_of(index) {
726 return Ok(None);
727 }
728 let key = self
729 .context
730 .base_key()
731 .base_tag_index(KeyTag::Index as u8, index);
732 Ok(self.context.store().read_value_bytes(&key).await?)
733 }
734
735 pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
749 #[cfg(with_metrics)]
750 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
751 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
752 if let Some(update) = self.updates.get(index) {
753 let test = match update {
754 Update::Removed => false,
755 Update::Set(_value) => true,
756 };
757 return Ok(test);
758 }
759 if self.deletion_set.contains_prefix_of(index) {
760 return Ok(false);
761 }
762 let key = self
763 .context
764 .base_key()
765 .base_tag_index(KeyTag::Index as u8, index);
766 Ok(self.context.store().contains_key(&key).await?)
767 }
768
769 pub async fn contains_keys(&self, indices: &[Vec<u8>]) -> Result<Vec<bool>, ViewError> {
784 #[cfg(with_metrics)]
785 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
786 let mut results = Vec::with_capacity(indices.len());
787 let mut missed_indices = Vec::new();
788 let mut vector_query = Vec::new();
789 for (i, index) in indices.iter().enumerate() {
790 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
791 if let Some(update) = self.updates.get(index) {
792 let value = match update {
793 Update::Removed => false,
794 Update::Set(_) => true,
795 };
796 results.push(value);
797 } else {
798 results.push(false);
799 if !self.deletion_set.contains_prefix_of(index) {
800 missed_indices.push(i);
801 let key = self
802 .context
803 .base_key()
804 .base_tag_index(KeyTag::Index as u8, index);
805 vector_query.push(key);
806 }
807 }
808 }
809 let values = self.context.store().contains_keys(&vector_query).await?;
810 for (i, value) in missed_indices.into_iter().zip(values) {
811 results[i] = value;
812 }
813 Ok(results)
814 }
815
816 pub async fn multi_get(&self, indices: &[Vec<u8>]) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
832 #[cfg(with_metrics)]
833 let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
834 let mut result = Vec::with_capacity(indices.len());
835 let mut missed_indices = Vec::new();
836 let mut vector_query = Vec::new();
837 for (i, index) in indices.iter().enumerate() {
838 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
839 if let Some(update) = self.updates.get(index) {
840 let value = match update {
841 Update::Removed => None,
842 Update::Set(value) => Some(value.clone()),
843 };
844 result.push(value);
845 } else {
846 result.push(None);
847 if !self.deletion_set.contains_prefix_of(index) {
848 missed_indices.push(i);
849 let key = self
850 .context
851 .base_key()
852 .base_tag_index(KeyTag::Index as u8, index);
853 vector_query.push(key);
854 }
855 }
856 }
857 let values = self
858 .context
859 .store()
860 .read_multi_values_bytes(&vector_query)
861 .await?;
862 for (i, value) in missed_indices.into_iter().zip(values) {
863 result[i] = value;
864 }
865 Ok(result)
866 }
867
868 pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
887 #[cfg(with_metrics)]
888 let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
889 *self.hash.get_mut().unwrap() = None;
890 let max_key_size = self.max_key_size();
891 for operation in batch.operations {
892 match operation {
893 WriteOperation::Delete { key } => {
894 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
895 if let Some(value) = self.sizes.get(&key).await? {
896 let entry_size = SizeData {
897 key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
898 value,
899 };
900 self.total_size.sub_assign(entry_size);
901 }
902 self.sizes.remove(key.clone());
903 if self.deletion_set.contains_prefix_of(&key) {
904 self.updates.remove(&key);
906 } else {
907 self.updates.insert(key, Update::Removed);
908 }
909 }
910 WriteOperation::Put { key, value } => {
911 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
912 let entry_size = SizeData {
913 key: key.len() as u32,
914 value: value.len() as u32,
915 };
916 self.total_size.add_assign(entry_size)?;
917 if let Some(value) = self.sizes.get(&key).await? {
918 let entry_size = SizeData {
919 key: key.len() as u32,
920 value,
921 };
922 self.total_size.sub_assign(entry_size);
923 }
924 self.sizes.insert(key.clone(), entry_size.value);
925 self.updates.insert(key, Update::Set(value));
926 }
927 WriteOperation::DeletePrefix { key_prefix } => {
928 ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
929 let key_list = self
930 .updates
931 .range(get_key_range_for_prefix(key_prefix.clone()))
932 .map(|x| x.0.to_vec())
933 .collect::<Vec<_>>();
934 for key in key_list {
935 self.updates.remove(&key);
936 }
937 let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
938 for (key, value) in key_values {
939 let entry_size = SizeData {
940 key: key.len() as u32,
941 value,
942 };
943 self.total_size.sub_assign(entry_size);
944 self.sizes.remove(key);
945 }
946 self.sizes.remove_by_prefix(key_prefix.clone());
947 self.deletion_set.insert_key_prefix(key_prefix);
948 }
949 }
950 }
951 Ok(())
952 }
953
954 pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
967 let mut batch = Batch::new();
968 batch.put_key_value_bytes(index, value);
969 self.write_batch(batch).await
970 }
971
972 pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
986 let mut batch = Batch::new();
987 batch.delete_key(index);
988 self.write_batch(batch).await
989 }
990
991 pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
1005 let mut batch = Batch::new();
1006 batch.delete_key_prefix(key_prefix);
1007 self.write_batch(batch).await
1008 }
1009
1010 pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
1025 #[cfg(with_metrics)]
1026 let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
1027 ensure!(
1028 key_prefix.len() <= self.max_key_size(),
1029 ViewError::KeyTooLong
1030 );
1031 let len = key_prefix.len();
1032 let key_prefix_full = self
1033 .context
1034 .base_key()
1035 .base_tag_index(KeyTag::Index as u8, key_prefix);
1036 let mut keys = Vec::new();
1037 let key_prefix_upper = get_upper_bound(key_prefix);
1038 let mut updates = self
1039 .updates
1040 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1041 let mut update = updates.next();
1042 if !self.deletion_set.delete_storage_first {
1043 let mut suffix_closed_set =
1044 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1045 for key in self
1046 .context
1047 .store()
1048 .find_keys_by_prefix(&key_prefix_full)
1049 .await?
1050 {
1051 loop {
1052 match update {
1053 Some((update_key, update_value))
1054 if &update_key[len..] <= key.as_slice() =>
1055 {
1056 if let Update::Set(_) = update_value {
1057 keys.push(update_key[len..].to_vec());
1058 }
1059 update = updates.next();
1060 if update_key[len..] == key[..] {
1061 break;
1062 }
1063 }
1064 _ => {
1065 let mut key_with_prefix = key_prefix.to_vec();
1066 key_with_prefix.extend_from_slice(&key);
1067 if !suffix_closed_set.find_key(&key_with_prefix) {
1068 keys.push(key);
1069 }
1070 break;
1071 }
1072 }
1073 }
1074 }
1075 }
1076 while let Some((update_key, update_value)) = update {
1077 if let Update::Set(_) = update_value {
1078 let update_key = update_key[len..].to_vec();
1079 keys.push(update_key);
1080 }
1081 update = updates.next();
1082 }
1083 Ok(keys)
1084 }
1085
1086 pub async fn find_key_values_by_prefix(
1102 &self,
1103 key_prefix: &[u8],
1104 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1105 #[cfg(with_metrics)]
1106 let _latency =
1107 metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1108 ensure!(
1109 key_prefix.len() <= self.max_key_size(),
1110 ViewError::KeyTooLong
1111 );
1112 let len = key_prefix.len();
1113 let key_prefix_full = self
1114 .context
1115 .base_key()
1116 .base_tag_index(KeyTag::Index as u8, key_prefix);
1117 let mut key_values = Vec::new();
1118 let key_prefix_upper = get_upper_bound(key_prefix);
1119 let mut updates = self
1120 .updates
1121 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1122 let mut update = updates.next();
1123 if !self.deletion_set.delete_storage_first {
1124 let mut suffix_closed_set =
1125 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1126 for entry in self
1127 .context
1128 .store()
1129 .find_key_values_by_prefix(&key_prefix_full)
1130 .await?
1131 {
1132 let (key, value) = entry;
1133 loop {
1134 match update {
1135 Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1136 if let Update::Set(update_value) = update_value {
1137 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1138 key_values.push(key_value);
1139 }
1140 update = updates.next();
1141 if update_key[len..] == key[..] {
1142 break;
1143 }
1144 }
1145 _ => {
1146 let mut key_with_prefix = key_prefix.to_vec();
1147 key_with_prefix.extend_from_slice(&key);
1148 if !suffix_closed_set.find_key(&key_with_prefix) {
1149 key_values.push((key, value));
1150 }
1151 break;
1152 }
1153 }
1154 }
1155 }
1156 }
1157 while let Some((update_key, update_value)) = update {
1158 if let Update::Set(update_value) = update_value {
1159 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1160 key_values.push(key_value);
1161 }
1162 update = updates.next();
1163 }
1164 Ok(key_values)
1165 }
1166
1167 async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1168 #[cfg(with_metrics)]
1169 let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1170 let mut hasher = sha3::Sha3_256::default();
1171 let mut count = 0u32;
1172 self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1173 count += 1;
1174 hasher.update_with_bytes(index)?;
1175 hasher.update_with_bytes(value)?;
1176 Ok(())
1177 })
1178 .await?;
1179 hasher.update_with_bcs_bytes(&count)?;
1180 Ok(hasher.finalize())
1181 }
1182}
1183
1184impl<C: Context> HashableView for KeyValueStoreView<C> {
1185 type Hasher = sha3::Sha3_256;
1186
1187 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1188 let hash = *self.hash.get_mut().unwrap();
1189 match hash {
1190 Some(hash) => Ok(hash),
1191 None => {
1192 let new_hash = self.compute_hash().await?;
1193 let hash = self.hash.get_mut().unwrap();
1194 *hash = Some(new_hash);
1195 Ok(new_hash)
1196 }
1197 }
1198 }
1199
1200 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1201 let hash = *self.hash.lock().unwrap();
1202 match hash {
1203 Some(hash) => Ok(hash),
1204 None => {
1205 let new_hash = self.compute_hash().await?;
1206 let mut hash = self.hash.lock().unwrap();
1207 *hash = Some(new_hash);
1208 Ok(new_hash)
1209 }
1210 }
1211 }
1212}
1213
1214pub type HashedKeyValueStoreView<C> =
1216 WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1217
1218pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1220
1221#[cfg(with_testing)]
1223#[derive(Debug, Clone)]
1224pub struct ViewContainer<C> {
1225 view: Arc<RwLock<KeyValueStoreView<C>>>,
1226}
1227
1228#[cfg(with_testing)]
1229impl<C> WithError for ViewContainer<C> {
1230 type Error = ViewContainerError;
1231}
1232
1233#[cfg(with_testing)]
1234#[derive(Error, Debug)]
1236pub enum ViewContainerError {
1237 #[error(transparent)]
1239 ViewError(#[from] ViewError),
1240
1241 #[error(transparent)]
1243 BcsError(#[from] bcs::Error),
1244}
1245
1246#[cfg(with_testing)]
1247impl KeyValueStoreError for ViewContainerError {
1248 const BACKEND: &'static str = "view_container";
1249}
1250
1251#[cfg(with_testing)]
1252impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1253 const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1254
1255 fn max_stream_queries(&self) -> usize {
1256 1
1257 }
1258
1259 fn root_key(&self) -> Result<Vec<u8>, ViewContainerError> {
1260 Ok(Vec::new())
1261 }
1262
1263 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1264 let view = self.view.read().await;
1265 Ok(view.get(key).await?)
1266 }
1267
1268 async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1269 let view = self.view.read().await;
1270 Ok(view.contains_key(key).await?)
1271 }
1272
1273 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, ViewContainerError> {
1274 let view = self.view.read().await;
1275 Ok(view.contains_keys(keys).await?)
1276 }
1277
1278 async fn read_multi_values_bytes(
1279 &self,
1280 keys: &[Vec<u8>],
1281 ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1282 let view = self.view.read().await;
1283 Ok(view.multi_get(keys).await?)
1284 }
1285
1286 async fn find_keys_by_prefix(
1287 &self,
1288 key_prefix: &[u8],
1289 ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1290 let view = self.view.read().await;
1291 Ok(view.find_keys_by_prefix(key_prefix).await?)
1292 }
1293
1294 async fn find_key_values_by_prefix(
1295 &self,
1296 key_prefix: &[u8],
1297 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1298 let view = self.view.read().await;
1299 Ok(view.find_key_values_by_prefix(key_prefix).await?)
1300 }
1301}
1302
1303#[cfg(with_testing)]
1304impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1305 const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1306
1307 async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1308 let mut view = self.view.write().await;
1309 view.write_batch(batch).await?;
1310 let mut batch = Batch::new();
1311 view.pre_save(&mut batch)?;
1312 view.post_save();
1313 view.context()
1314 .store()
1315 .write_batch(batch)
1316 .await
1317 .map_err(ViewError::from)?;
1318 Ok(())
1319 }
1320
1321 async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1322 Ok(())
1323 }
1324}
1325
1326#[cfg(with_testing)]
1327impl<C: Context> ViewContainer<C> {
1328 pub async fn new(context: C) -> Result<Self, ViewError> {
1330 let view = KeyValueStoreView::load(context).await?;
1331 let view = Arc::new(RwLock::new(view));
1332 Ok(Self { view })
1333 }
1334}