1use std::{collections::BTreeMap, fmt::Debug, mem, ops::Bound::Included, sync::Mutex};
17
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{data_types::ArithmeticError, ensure};
21use serde::{Deserialize, Serialize};
22
23use crate::{
24 batch::{Batch, WriteOperation},
25 common::{
26 from_bytes_option, from_bytes_option_or_default, get_interval, get_upper_bound,
27 DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
28 },
29 context::Context,
30 map_view::ByteMapView,
31 store::ReadableKeyValueStore,
32 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
33};
34
35#[cfg(with_metrics)]
36mod metrics {
37 use std::sync::LazyLock;
38
39 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
40 use prometheus::HistogramVec;
41
42 pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
44 register_histogram_vec(
45 "key_value_store_view_hash_latency",
46 "KeyValueStoreView hash latency",
47 &[],
48 exponential_bucket_latencies(5.0),
49 )
50 });
51
52 pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54 register_histogram_vec(
55 "key_value_store_view_get_latency",
56 "KeyValueStoreView get latency",
57 &[],
58 exponential_bucket_latencies(5.0),
59 )
60 });
61
62 pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
64 LazyLock::new(|| {
65 register_histogram_vec(
66 "key_value_store_view_multi_get_latency",
67 "KeyValueStoreView multi get latency",
68 &[],
69 exponential_bucket_latencies(5.0),
70 )
71 });
72
73 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
75 LazyLock::new(|| {
76 register_histogram_vec(
77 "key_value_store_view_contains_key_latency",
78 "KeyValueStoreView contains key latency",
79 &[],
80 exponential_bucket_latencies(5.0),
81 )
82 });
83
84 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
86 LazyLock::new(|| {
87 register_histogram_vec(
88 "key_value_store_view_contains_keys_latency",
89 "KeyValueStoreView contains keys latency",
90 &[],
91 exponential_bucket_latencies(5.0),
92 )
93 });
94
95 pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
97 LazyLock::new(|| {
98 register_histogram_vec(
99 "key_value_store_view_find_keys_by_prefix_latency",
100 "KeyValueStoreView find keys by prefix latency",
101 &[],
102 exponential_bucket_latencies(5.0),
103 )
104 });
105
106 pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
108 LazyLock::new(|| {
109 register_histogram_vec(
110 "key_value_store_view_find_key_values_by_prefix_latency",
111 "KeyValueStoreView find key values by prefix latency",
112 &[],
113 exponential_bucket_latencies(5.0),
114 )
115 });
116
117 pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
119 LazyLock::new(|| {
120 register_histogram_vec(
121 "key_value_store_view_write_batch_latency",
122 "KeyValueStoreView write batch latency",
123 &[],
124 exponential_bucket_latencies(5.0),
125 )
126 });
127}
128
129#[cfg(with_testing)]
130use {
131 crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
132 async_lock::RwLock,
133 std::sync::Arc,
134 thiserror::Error,
135};
136
137#[repr(u8)]
138enum KeyTag {
139 Index = MIN_VIEW_TAG,
141 TotalSize,
143 Sizes,
145 Hash,
147}
148
149#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub struct SizeData {
152 pub key: u32,
154 pub value: u32,
156}
157
158impl SizeData {
159 pub fn sum(&mut self) -> u32 {
161 self.key + self.value
162 }
163
164 pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
166 self.key = self
167 .key
168 .checked_add(size.key)
169 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
170 self.value = self
171 .value
172 .checked_add(size.value)
173 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
174 Ok(())
175 }
176
177 pub fn sub_assign(&mut self, size: SizeData) {
179 self.key -= size.key;
180 self.value -= size.value;
181 }
182}
183
184#[derive(Debug)]
202pub struct KeyValueStoreView<C> {
203 context: C,
204 deletion_set: DeletionSet,
205 updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
206 stored_total_size: SizeData,
207 total_size: SizeData,
208 sizes: ByteMapView<C, u32>,
209 stored_hash: Option<HasherOutput>,
210 hash: Mutex<Option<HasherOutput>>,
211}
212
213impl<C: Context> View for KeyValueStoreView<C> {
214 const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
215
216 type Context = C;
217
218 fn context(&self) -> &C {
219 &self.context
220 }
221
222 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
223 let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
224 let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
225 let mut v = vec![key_hash, key_total_size];
226 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
227 let context_sizes = context.clone_with_base_key(base_key);
228 v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
229 Ok(v)
230 }
231
232 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
233 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
234 let total_size =
235 from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
236 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
237 let context_sizes = context.clone_with_base_key(base_key);
238 let sizes = ByteMapView::post_load(
239 context_sizes,
240 values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
241 )?;
242 Ok(Self {
243 context,
244 deletion_set: DeletionSet::new(),
245 updates: BTreeMap::new(),
246 stored_total_size: total_size,
247 total_size,
248 sizes,
249 stored_hash: hash,
250 hash: Mutex::new(hash),
251 })
252 }
253
254 async fn load(context: C) -> Result<Self, ViewError> {
255 let keys = Self::pre_load(&context)?;
256 let values = context.store().read_multi_values_bytes(keys).await?;
257 Self::post_load(context, &values)
258 }
259
260 fn rollback(&mut self) {
261 self.deletion_set.rollback();
262 self.updates.clear();
263 self.total_size = self.stored_total_size;
264 self.sizes.rollback();
265 *self.hash.get_mut().unwrap() = self.stored_hash;
266 }
267
268 async fn has_pending_changes(&self) -> bool {
269 if self.deletion_set.has_pending_changes() {
270 return true;
271 }
272 if !self.updates.is_empty() {
273 return true;
274 }
275 if self.stored_total_size != self.total_size {
276 return true;
277 }
278 if self.sizes.has_pending_changes().await {
279 return true;
280 }
281 let hash = self.hash.lock().unwrap();
282 self.stored_hash != *hash
283 }
284
285 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
286 let mut delete_view = false;
287 if self.deletion_set.delete_storage_first {
288 delete_view = true;
289 self.stored_total_size = SizeData::default();
290 batch.delete_key_prefix(self.context.base_key().bytes.clone());
291 for (index, update) in mem::take(&mut self.updates) {
292 if let Update::Set(value) = update {
293 let key = self
294 .context
295 .base_key()
296 .base_tag_index(KeyTag::Index as u8, &index);
297 batch.put_key_value_bytes(key, value);
298 delete_view = false;
299 }
300 }
301 self.stored_hash = None
302 } else {
303 for index in mem::take(&mut self.deletion_set.deleted_prefixes) {
304 let key = self
305 .context
306 .base_key()
307 .base_tag_index(KeyTag::Index as u8, &index);
308 batch.delete_key_prefix(key);
309 }
310 for (index, update) in mem::take(&mut self.updates) {
311 let key = self
312 .context
313 .base_key()
314 .base_tag_index(KeyTag::Index as u8, &index);
315 match update {
316 Update::Removed => batch.delete_key(key),
317 Update::Set(value) => batch.put_key_value_bytes(key, value),
318 }
319 }
320 }
321 self.sizes.flush(batch)?;
322 let hash = *self.hash.get_mut().unwrap();
323 if self.stored_hash != hash {
324 let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
325 match hash {
326 None => batch.delete_key(key),
327 Some(hash) => batch.put_key_value(key, &hash)?,
328 }
329 self.stored_hash = hash;
330 }
331 if self.stored_total_size != self.total_size {
332 let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
333 batch.put_key_value(key, &self.total_size)?;
334 self.stored_total_size = self.total_size;
335 }
336 self.deletion_set.delete_storage_first = false;
337 Ok(delete_view)
338 }
339
340 fn clear(&mut self) {
341 self.deletion_set.clear();
342 self.updates.clear();
343 self.total_size = SizeData::default();
344 self.sizes.clear();
345 *self.hash.get_mut().unwrap() = None;
346 }
347}
348
349impl<C: Context> ClonableView for KeyValueStoreView<C> {
350 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
351 Ok(KeyValueStoreView {
352 context: self.context.clone(),
353 deletion_set: self.deletion_set.clone(),
354 updates: self.updates.clone(),
355 stored_total_size: self.stored_total_size,
356 total_size: self.total_size,
357 sizes: self.sizes.clone_unchecked()?,
358 stored_hash: self.stored_hash,
359 hash: Mutex::new(*self.hash.get_mut().unwrap()),
360 })
361 }
362}
363
364impl<C: Context> KeyValueStoreView<C> {
365 fn max_key_size(&self) -> usize {
366 let prefix_len = self.context.base_key().bytes.len();
367 <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
368 }
369
370 pub fn total_size(&self) -> SizeData {
383 self.total_size
384 }
385
386 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
409 where
410 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
411 {
412 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
413 let mut updates = self.updates.iter();
414 let mut update = updates.next();
415 if !self.deletion_set.delete_storage_first {
416 let mut suffix_closed_set =
417 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
418 for index in self
419 .context
420 .store()
421 .find_keys_by_prefix(&key_prefix)
422 .await?
423 {
424 loop {
425 match update {
426 Some((key, value)) if key <= &index => {
427 if let Update::Set(_) = value {
428 if !f(key)? {
429 return Ok(());
430 }
431 }
432 update = updates.next();
433 if key == &index {
434 break;
435 }
436 }
437 _ => {
438 if !suffix_closed_set.find_key(&index) && !f(&index)? {
439 return Ok(());
440 }
441 break;
442 }
443 }
444 }
445 }
446 }
447 while let Some((key, value)) = update {
448 if let Update::Set(_) = value {
449 if !f(key)? {
450 return Ok(());
451 }
452 }
453 update = updates.next();
454 }
455 Ok(())
456 }
457
458 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
480 where
481 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
482 {
483 self.for_each_index_while(|key| {
484 f(key)?;
485 Ok(true)
486 })
487 .await
488 }
489
490 pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
512 where
513 F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
514 {
515 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
516 let mut updates = self.updates.iter();
517 let mut update = updates.next();
518 if !self.deletion_set.delete_storage_first {
519 let mut suffix_closed_set =
520 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
521 for entry in self
522 .context
523 .store()
524 .find_key_values_by_prefix(&key_prefix)
525 .await?
526 {
527 let (index, index_val) = entry;
528 loop {
529 match update {
530 Some((key, value)) if key <= &index => {
531 if let Update::Set(value) = value {
532 if !f(key, value)? {
533 return Ok(());
534 }
535 }
536 update = updates.next();
537 if key == &index {
538 break;
539 }
540 }
541 _ => {
542 if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
543 return Ok(());
544 }
545 break;
546 }
547 }
548 }
549 }
550 }
551 while let Some((key, value)) = update {
552 if let Update::Set(value) = value {
553 if !f(key, value)? {
554 return Ok(());
555 }
556 }
557 update = updates.next();
558 }
559 Ok(())
560 }
561
562 pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
583 where
584 F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
585 {
586 self.for_each_index_value_while(|key, value| {
587 f(key, value)?;
588 Ok(true)
589 })
590 .await
591 }
592
593 pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
608 let mut indices = Vec::new();
609 self.for_each_index(|index| {
610 indices.push(index.to_vec());
611 Ok(())
612 })
613 .await?;
614 Ok(indices)
615 }
616
617 pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
632 let mut index_values = Vec::new();
633 self.for_each_index_value(|index, value| {
634 index_values.push((index.to_vec(), value.to_vec()));
635 Ok(())
636 })
637 .await?;
638 Ok(index_values)
639 }
640
641 pub async fn count(&self) -> Result<usize, ViewError> {
656 let mut count = 0;
657 self.for_each_index(|_index| {
658 count += 1;
659 Ok(())
660 })
661 .await?;
662 Ok(count)
663 }
664
665 pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
679 #[cfg(with_metrics)]
680 let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
681 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
682 if let Some(update) = self.updates.get(index) {
683 let value = match update {
684 Update::Removed => None,
685 Update::Set(value) => Some(value.clone()),
686 };
687 return Ok(value);
688 }
689 if self.deletion_set.contains_prefix_of(index) {
690 return Ok(None);
691 }
692 let key = self
693 .context
694 .base_key()
695 .base_tag_index(KeyTag::Index as u8, index);
696 Ok(self.context.store().read_value_bytes(&key).await?)
697 }
698
699 pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
713 #[cfg(with_metrics)]
714 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
715 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
716 if let Some(update) = self.updates.get(index) {
717 let test = match update {
718 Update::Removed => false,
719 Update::Set(_value) => true,
720 };
721 return Ok(test);
722 }
723 if self.deletion_set.contains_prefix_of(index) {
724 return Ok(false);
725 }
726 let key = self
727 .context
728 .base_key()
729 .base_tag_index(KeyTag::Index as u8, index);
730 Ok(self.context.store().contains_key(&key).await?)
731 }
732
733 pub async fn contains_keys(&self, indices: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewError> {
748 #[cfg(with_metrics)]
749 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
750 let mut results = Vec::with_capacity(indices.len());
751 let mut missed_indices = Vec::new();
752 let mut vector_query = Vec::new();
753 for (i, index) in indices.into_iter().enumerate() {
754 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
755 if let Some(update) = self.updates.get(&index) {
756 let value = match update {
757 Update::Removed => false,
758 Update::Set(_) => true,
759 };
760 results.push(value);
761 } else {
762 results.push(false);
763 if !self.deletion_set.contains_prefix_of(&index) {
764 missed_indices.push(i);
765 let key = self
766 .context
767 .base_key()
768 .base_tag_index(KeyTag::Index as u8, &index);
769 vector_query.push(key);
770 }
771 }
772 }
773 let values = self.context.store().contains_keys(vector_query).await?;
774 for (i, value) in missed_indices.into_iter().zip(values) {
775 results[i] = value;
776 }
777 Ok(results)
778 }
779
780 pub async fn multi_get(
796 &self,
797 indices: Vec<Vec<u8>>,
798 ) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
799 #[cfg(with_metrics)]
800 let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
801 let mut result = Vec::with_capacity(indices.len());
802 let mut missed_indices = Vec::new();
803 let mut vector_query = Vec::new();
804 for (i, index) in indices.into_iter().enumerate() {
805 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
806 if let Some(update) = self.updates.get(&index) {
807 let value = match update {
808 Update::Removed => None,
809 Update::Set(value) => Some(value.clone()),
810 };
811 result.push(value);
812 } else {
813 result.push(None);
814 if !self.deletion_set.contains_prefix_of(&index) {
815 missed_indices.push(i);
816 let key = self
817 .context
818 .base_key()
819 .base_tag_index(KeyTag::Index as u8, &index);
820 vector_query.push(key);
821 }
822 }
823 }
824 let values = self
825 .context
826 .store()
827 .read_multi_values_bytes(vector_query)
828 .await?;
829 for (i, value) in missed_indices.into_iter().zip(values) {
830 result[i] = value;
831 }
832 Ok(result)
833 }
834
835 pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
854 #[cfg(with_metrics)]
855 let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
856 *self.hash.get_mut().unwrap() = None;
857 let max_key_size = self.max_key_size();
858 for operation in batch.operations {
859 match operation {
860 WriteOperation::Delete { key } => {
861 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
862 if let Some(value) = self.sizes.get(&key).await? {
863 let entry_size = SizeData {
864 key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
865 value,
866 };
867 self.total_size.sub_assign(entry_size);
868 }
869 self.sizes.remove(key.clone());
870 if self.deletion_set.contains_prefix_of(&key) {
871 self.updates.remove(&key);
873 } else {
874 self.updates.insert(key, Update::Removed);
875 }
876 }
877 WriteOperation::Put { key, value } => {
878 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
879 let entry_size = SizeData {
880 key: key.len() as u32,
881 value: value.len() as u32,
882 };
883 self.total_size.add_assign(entry_size)?;
884 if let Some(value) = self.sizes.get(&key).await? {
885 let entry_size = SizeData {
886 key: key.len() as u32,
887 value,
888 };
889 self.total_size.sub_assign(entry_size);
890 }
891 self.sizes.insert(key.clone(), entry_size.value);
892 self.updates.insert(key, Update::Set(value));
893 }
894 WriteOperation::DeletePrefix { key_prefix } => {
895 ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
896 let key_list = self
897 .updates
898 .range(get_interval(key_prefix.clone()))
899 .map(|x| x.0.to_vec())
900 .collect::<Vec<_>>();
901 for key in key_list {
902 self.updates.remove(&key);
903 }
904 let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
905 for (key, value) in key_values {
906 let entry_size = SizeData {
907 key: key.len() as u32,
908 value,
909 };
910 self.total_size.sub_assign(entry_size);
911 self.sizes.remove(key);
912 }
913 self.sizes.remove_by_prefix(key_prefix.clone());
914 self.deletion_set.insert_key_prefix(key_prefix);
915 }
916 }
917 }
918 Ok(())
919 }
920
921 pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
934 let mut batch = Batch::new();
935 batch.put_key_value_bytes(index, value);
936 self.write_batch(batch).await
937 }
938
939 pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
953 let mut batch = Batch::new();
954 batch.delete_key(index);
955 self.write_batch(batch).await
956 }
957
958 pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
972 let mut batch = Batch::new();
973 batch.delete_key_prefix(key_prefix);
974 self.write_batch(batch).await
975 }
976
977 pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
992 #[cfg(with_metrics)]
993 let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
994 ensure!(
995 key_prefix.len() <= self.max_key_size(),
996 ViewError::KeyTooLong
997 );
998 let len = key_prefix.len();
999 let key_prefix_full = self
1000 .context
1001 .base_key()
1002 .base_tag_index(KeyTag::Index as u8, key_prefix);
1003 let mut keys = Vec::new();
1004 let key_prefix_upper = get_upper_bound(key_prefix);
1005 let mut updates = self
1006 .updates
1007 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1008 let mut update = updates.next();
1009 if !self.deletion_set.delete_storage_first {
1010 let mut suffix_closed_set =
1011 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1012 for key in self
1013 .context
1014 .store()
1015 .find_keys_by_prefix(&key_prefix_full)
1016 .await?
1017 {
1018 loop {
1019 match update {
1020 Some((update_key, update_value))
1021 if &update_key[len..] <= key.as_slice() =>
1022 {
1023 if let Update::Set(_) = update_value {
1024 keys.push(update_key[len..].to_vec());
1025 }
1026 update = updates.next();
1027 if update_key[len..] == key[..] {
1028 break;
1029 }
1030 }
1031 _ => {
1032 let mut key_with_prefix = key_prefix.to_vec();
1033 key_with_prefix.extend_from_slice(&key);
1034 if !suffix_closed_set.find_key(&key_with_prefix) {
1035 keys.push(key);
1036 }
1037 break;
1038 }
1039 }
1040 }
1041 }
1042 }
1043 while let Some((update_key, update_value)) = update {
1044 if let Update::Set(_) = update_value {
1045 let update_key = update_key[len..].to_vec();
1046 keys.push(update_key);
1047 }
1048 update = updates.next();
1049 }
1050 Ok(keys)
1051 }
1052
1053 pub async fn find_key_values_by_prefix(
1069 &self,
1070 key_prefix: &[u8],
1071 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1072 #[cfg(with_metrics)]
1073 let _latency =
1074 metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1075 ensure!(
1076 key_prefix.len() <= self.max_key_size(),
1077 ViewError::KeyTooLong
1078 );
1079 let len = key_prefix.len();
1080 let key_prefix_full = self
1081 .context
1082 .base_key()
1083 .base_tag_index(KeyTag::Index as u8, key_prefix);
1084 let mut key_values = Vec::new();
1085 let key_prefix_upper = get_upper_bound(key_prefix);
1086 let mut updates = self
1087 .updates
1088 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1089 let mut update = updates.next();
1090 if !self.deletion_set.delete_storage_first {
1091 let mut suffix_closed_set =
1092 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1093 for entry in self
1094 .context
1095 .store()
1096 .find_key_values_by_prefix(&key_prefix_full)
1097 .await?
1098 {
1099 let (key, value) = entry;
1100 loop {
1101 match update {
1102 Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1103 if let Update::Set(update_value) = update_value {
1104 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1105 key_values.push(key_value);
1106 }
1107 update = updates.next();
1108 if update_key[len..] == key[..] {
1109 break;
1110 }
1111 }
1112 _ => {
1113 let mut key_with_prefix = key_prefix.to_vec();
1114 key_with_prefix.extend_from_slice(&key);
1115 if !suffix_closed_set.find_key(&key_with_prefix) {
1116 key_values.push((key, value));
1117 }
1118 break;
1119 }
1120 }
1121 }
1122 }
1123 }
1124 while let Some((update_key, update_value)) = update {
1125 if let Update::Set(update_value) = update_value {
1126 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1127 key_values.push(key_value);
1128 }
1129 update = updates.next();
1130 }
1131 Ok(key_values)
1132 }
1133
1134 async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1135 #[cfg(with_metrics)]
1136 let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1137 let mut hasher = sha3::Sha3_256::default();
1138 let mut count = 0u32;
1139 self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1140 count += 1;
1141 hasher.update_with_bytes(index)?;
1142 hasher.update_with_bytes(value)?;
1143 Ok(())
1144 })
1145 .await?;
1146 hasher.update_with_bcs_bytes(&count)?;
1147 Ok(hasher.finalize())
1148 }
1149}
1150
1151impl<C: Context> HashableView for KeyValueStoreView<C> {
1152 type Hasher = sha3::Sha3_256;
1153
1154 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1155 let hash = *self.hash.get_mut().unwrap();
1156 match hash {
1157 Some(hash) => Ok(hash),
1158 None => {
1159 let new_hash = self.compute_hash().await?;
1160 let hash = self.hash.get_mut().unwrap();
1161 *hash = Some(new_hash);
1162 Ok(new_hash)
1163 }
1164 }
1165 }
1166
1167 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1168 let hash = *self.hash.lock().unwrap();
1169 match hash {
1170 Some(hash) => Ok(hash),
1171 None => {
1172 let new_hash = self.compute_hash().await?;
1173 let mut hash = self.hash.lock().unwrap();
1174 *hash = Some(new_hash);
1175 Ok(new_hash)
1176 }
1177 }
1178 }
1179}
1180
1181#[cfg(with_testing)]
1183#[derive(Debug, Clone)]
1184pub struct ViewContainer<C> {
1185 view: Arc<RwLock<KeyValueStoreView<C>>>,
1186}
1187
1188#[cfg(with_testing)]
1189impl<C> WithError for ViewContainer<C> {
1190 type Error = ViewContainerError;
1191}
1192
1193#[cfg(with_testing)]
1194#[derive(Error, Debug)]
1196pub enum ViewContainerError {
1197 #[error(transparent)]
1199 ViewError(#[from] ViewError),
1200
1201 #[error(transparent)]
1203 BcsError(#[from] bcs::Error),
1204}
1205
1206#[cfg(with_testing)]
1207impl KeyValueStoreError for ViewContainerError {
1208 const BACKEND: &'static str = "view_container";
1209}
1210
1211#[cfg(with_testing)]
1212impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1213 const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1214
1215 fn max_stream_queries(&self) -> usize {
1216 1
1217 }
1218
1219 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1220 let view = self.view.read().await;
1221 Ok(view.get(key).await?)
1222 }
1223
1224 async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1225 let view = self.view.read().await;
1226 Ok(view.contains_key(key).await?)
1227 }
1228
1229 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewContainerError> {
1230 let view = self.view.read().await;
1231 Ok(view.contains_keys(keys).await?)
1232 }
1233
1234 async fn read_multi_values_bytes(
1235 &self,
1236 keys: Vec<Vec<u8>>,
1237 ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1238 let view = self.view.read().await;
1239 Ok(view.multi_get(keys).await?)
1240 }
1241
1242 async fn find_keys_by_prefix(
1243 &self,
1244 key_prefix: &[u8],
1245 ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1246 let view = self.view.read().await;
1247 Ok(view.find_keys_by_prefix(key_prefix).await?)
1248 }
1249
1250 async fn find_key_values_by_prefix(
1251 &self,
1252 key_prefix: &[u8],
1253 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1254 let view = self.view.read().await;
1255 Ok(view.find_key_values_by_prefix(key_prefix).await?)
1256 }
1257}
1258
1259#[cfg(with_testing)]
1260impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1261 const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1262
1263 async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1264 let mut view = self.view.write().await;
1265 view.write_batch(batch).await?;
1266 let mut batch = Batch::new();
1267 view.flush(&mut batch)?;
1268 view.context()
1269 .store()
1270 .write_batch(batch)
1271 .await
1272 .map_err(ViewError::from)?;
1273 Ok(())
1274 }
1275
1276 async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1277 Ok(())
1278 }
1279}
1280
1281#[cfg(with_testing)]
1282impl<C: Context> ViewContainer<C> {
1283 pub async fn new(context: C) -> Result<Self, ViewError> {
1285 let view = KeyValueStoreView::load(context).await?;
1286 let view = Arc::new(RwLock::new(view));
1287 Ok(Self { view })
1288 }
1289}