1use std::{collections::BTreeMap, fmt::Debug, mem, ops::Bound::Included, sync::Mutex};
17
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{data_types::ArithmeticError, ensure};
21use serde::{Deserialize, Serialize};
22
23use crate::{
24 batch::{Batch, WriteOperation},
25 common::{
26 from_bytes_option, from_bytes_option_or_default, get_interval, get_upper_bound,
27 DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
28 },
29 context::Context,
30 map_view::ByteMapView,
31 store::{KeyIterable, KeyValueIterable, ReadableKeyValueStore},
32 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
33};
34
35#[cfg(with_metrics)]
36mod metrics {
37 use std::sync::LazyLock;
38
39 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
40 use prometheus::HistogramVec;
41
42 pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
44 register_histogram_vec(
45 "key_value_store_view_hash_latency",
46 "KeyValueStoreView hash latency",
47 &[],
48 exponential_bucket_latencies(5.0),
49 )
50 });
51
52 pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54 register_histogram_vec(
55 "key_value_store_view_get_latency",
56 "KeyValueStoreView get latency",
57 &[],
58 exponential_bucket_latencies(5.0),
59 )
60 });
61
62 pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
64 LazyLock::new(|| {
65 register_histogram_vec(
66 "key_value_store_view_multi_get_latency",
67 "KeyValueStoreView multi get latency",
68 &[],
69 exponential_bucket_latencies(5.0),
70 )
71 });
72
73 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
75 LazyLock::new(|| {
76 register_histogram_vec(
77 "key_value_store_view_contains_key_latency",
78 "KeyValueStoreView contains key latency",
79 &[],
80 exponential_bucket_latencies(5.0),
81 )
82 });
83
84 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
86 LazyLock::new(|| {
87 register_histogram_vec(
88 "key_value_store_view_contains_keys_latency",
89 "KeyValueStoreView contains keys latency",
90 &[],
91 exponential_bucket_latencies(5.0),
92 )
93 });
94
95 pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
97 LazyLock::new(|| {
98 register_histogram_vec(
99 "key_value_store_view_find_keys_by_prefix_latency",
100 "KeyValueStoreView find keys by prefix latency",
101 &[],
102 exponential_bucket_latencies(5.0),
103 )
104 });
105
106 pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
108 LazyLock::new(|| {
109 register_histogram_vec(
110 "key_value_store_view_find_key_values_by_prefix_latency",
111 "KeyValueStoreView find key values by prefix latency",
112 &[],
113 exponential_bucket_latencies(5.0),
114 )
115 });
116
117 pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
119 LazyLock::new(|| {
120 register_histogram_vec(
121 "key_value_store_view_write_batch_latency",
122 "KeyValueStoreView write batch latency",
123 &[],
124 exponential_bucket_latencies(5.0),
125 )
126 });
127}
128
129#[cfg(with_testing)]
130use {
131 crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
132 async_lock::RwLock,
133 std::sync::Arc,
134 thiserror::Error,
135};
136
137#[repr(u8)]
138enum KeyTag {
139 Index = MIN_VIEW_TAG,
141 TotalSize,
143 Sizes,
145 Hash,
147}
148
149#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub struct SizeData {
152 pub key: u32,
154 pub value: u32,
156}
157
158impl SizeData {
159 pub fn sum(&mut self) -> u32 {
161 self.key + self.value
162 }
163
164 pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
166 self.key = self
167 .key
168 .checked_add(size.key)
169 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
170 self.value = self
171 .value
172 .checked_add(size.value)
173 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
174 Ok(())
175 }
176
177 pub fn sub_assign(&mut self, size: SizeData) {
179 self.key -= size.key;
180 self.value -= size.value;
181 }
182}
183
184#[derive(Debug)]
202pub struct KeyValueStoreView<C> {
203 context: C,
204 deletion_set: DeletionSet,
205 updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
206 stored_total_size: SizeData,
207 total_size: SizeData,
208 sizes: ByteMapView<C, u32>,
209 stored_hash: Option<HasherOutput>,
210 hash: Mutex<Option<HasherOutput>>,
211}
212
213impl<C: Context> View for KeyValueStoreView<C> {
214 const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
215
216 type Context = C;
217
218 fn context(&self) -> &C {
219 &self.context
220 }
221
222 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
223 let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
224 let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
225 let mut v = vec![key_hash, key_total_size];
226 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
227 let context_sizes = context.clone_with_base_key(base_key);
228 v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
229 Ok(v)
230 }
231
232 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
233 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
234 let total_size =
235 from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
236 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
237 let context_sizes = context.clone_with_base_key(base_key);
238 let sizes = ByteMapView::post_load(
239 context_sizes,
240 values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
241 )?;
242 Ok(Self {
243 context,
244 deletion_set: DeletionSet::new(),
245 updates: BTreeMap::new(),
246 stored_total_size: total_size,
247 total_size,
248 sizes,
249 stored_hash: hash,
250 hash: Mutex::new(hash),
251 })
252 }
253
254 async fn load(context: C) -> Result<Self, ViewError> {
255 let keys = Self::pre_load(&context)?;
256 let values = context.store().read_multi_values_bytes(keys).await?;
257 Self::post_load(context, &values)
258 }
259
260 fn rollback(&mut self) {
261 self.deletion_set.rollback();
262 self.updates.clear();
263 self.total_size = self.stored_total_size;
264 self.sizes.rollback();
265 *self.hash.get_mut().unwrap() = self.stored_hash;
266 }
267
268 async fn has_pending_changes(&self) -> bool {
269 if self.deletion_set.has_pending_changes() {
270 return true;
271 }
272 if !self.updates.is_empty() {
273 return true;
274 }
275 if self.stored_total_size != self.total_size {
276 return true;
277 }
278 if self.sizes.has_pending_changes().await {
279 return true;
280 }
281 let hash = self.hash.lock().unwrap();
282 self.stored_hash != *hash
283 }
284
285 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
286 let mut delete_view = false;
287 if self.deletion_set.delete_storage_first {
288 delete_view = true;
289 self.stored_total_size = SizeData::default();
290 batch.delete_key_prefix(self.context.base_key().bytes.clone());
291 for (index, update) in mem::take(&mut self.updates) {
292 if let Update::Set(value) = update {
293 let key = self
294 .context
295 .base_key()
296 .base_tag_index(KeyTag::Index as u8, &index);
297 batch.put_key_value_bytes(key, value);
298 delete_view = false;
299 }
300 }
301 self.stored_hash = None
302 } else {
303 for index in mem::take(&mut self.deletion_set.deleted_prefixes) {
304 let key = self
305 .context
306 .base_key()
307 .base_tag_index(KeyTag::Index as u8, &index);
308 batch.delete_key_prefix(key);
309 }
310 for (index, update) in mem::take(&mut self.updates) {
311 let key = self
312 .context
313 .base_key()
314 .base_tag_index(KeyTag::Index as u8, &index);
315 match update {
316 Update::Removed => batch.delete_key(key),
317 Update::Set(value) => batch.put_key_value_bytes(key, value),
318 }
319 }
320 }
321 self.sizes.flush(batch)?;
322 let hash = *self.hash.get_mut().unwrap();
323 if self.stored_hash != hash {
324 let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
325 match hash {
326 None => batch.delete_key(key),
327 Some(hash) => batch.put_key_value(key, &hash)?,
328 }
329 self.stored_hash = hash;
330 }
331 if self.stored_total_size != self.total_size {
332 let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
333 batch.put_key_value(key, &self.total_size)?;
334 self.stored_total_size = self.total_size;
335 }
336 self.deletion_set.delete_storage_first = false;
337 Ok(delete_view)
338 }
339
340 fn clear(&mut self) {
341 self.deletion_set.clear();
342 self.updates.clear();
343 self.total_size = SizeData::default();
344 self.sizes.clear();
345 *self.hash.get_mut().unwrap() = None;
346 }
347}
348
349impl<C: Context> ClonableView for KeyValueStoreView<C> {
350 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
351 Ok(KeyValueStoreView {
352 context: self.context.clone(),
353 deletion_set: self.deletion_set.clone(),
354 updates: self.updates.clone(),
355 stored_total_size: self.stored_total_size,
356 total_size: self.total_size,
357 sizes: self.sizes.clone_unchecked()?,
358 stored_hash: self.stored_hash,
359 hash: Mutex::new(*self.hash.get_mut().unwrap()),
360 })
361 }
362}
363
364impl<C: Context> KeyValueStoreView<C> {
365 fn max_key_size(&self) -> usize {
366 let prefix_len = self.context.base_key().bytes.len();
367 <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
368 }
369
370 pub fn total_size(&self) -> SizeData {
383 self.total_size
384 }
385
386 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
409 where
410 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
411 {
412 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
413 let mut updates = self.updates.iter();
414 let mut update = updates.next();
415 if !self.deletion_set.delete_storage_first {
416 let mut suffix_closed_set =
417 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
418 for index in self
419 .context
420 .store()
421 .find_keys_by_prefix(&key_prefix)
422 .await?
423 .iterator()
424 {
425 let index = index?;
426 loop {
427 match update {
428 Some((key, value)) if key.as_slice() <= index => {
429 if let Update::Set(_) = value {
430 if !f(key)? {
431 return Ok(());
432 }
433 }
434 update = updates.next();
435 if key == index {
436 break;
437 }
438 }
439 _ => {
440 if !suffix_closed_set.find_key(index) && !f(index)? {
441 return Ok(());
442 }
443 break;
444 }
445 }
446 }
447 }
448 }
449 while let Some((key, value)) = update {
450 if let Update::Set(_) = value {
451 if !f(key)? {
452 return Ok(());
453 }
454 }
455 update = updates.next();
456 }
457 Ok(())
458 }
459
460 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
482 where
483 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
484 {
485 self.for_each_index_while(|key| {
486 f(key)?;
487 Ok(true)
488 })
489 .await
490 }
491
492 pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
514 where
515 F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
516 {
517 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
518 let mut updates = self.updates.iter();
519 let mut update = updates.next();
520 if !self.deletion_set.delete_storage_first {
521 let mut suffix_closed_set =
522 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
523 for entry in self
524 .context
525 .store()
526 .find_key_values_by_prefix(&key_prefix)
527 .await?
528 .iterator()
529 {
530 let (index, index_val) = entry?;
531 loop {
532 match update {
533 Some((key, value)) if key.as_slice() <= index => {
534 if let Update::Set(value) = value {
535 if !f(key, value)? {
536 return Ok(());
537 }
538 }
539 update = updates.next();
540 if key == index {
541 break;
542 }
543 }
544 _ => {
545 if !suffix_closed_set.find_key(index) && !f(index, index_val)? {
546 return Ok(());
547 }
548 break;
549 }
550 }
551 }
552 }
553 }
554 while let Some((key, value)) = update {
555 if let Update::Set(value) = value {
556 if !f(key, value)? {
557 return Ok(());
558 }
559 }
560 update = updates.next();
561 }
562 Ok(())
563 }
564
565 pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
586 where
587 F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
588 {
589 self.for_each_index_value_while(|key, value| {
590 f(key, value)?;
591 Ok(true)
592 })
593 .await
594 }
595
596 pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
611 let mut indices = Vec::new();
612 self.for_each_index(|index| {
613 indices.push(index.to_vec());
614 Ok(())
615 })
616 .await?;
617 Ok(indices)
618 }
619
620 pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
635 let mut index_values = Vec::new();
636 self.for_each_index_value(|index, value| {
637 index_values.push((index.to_vec(), value.to_vec()));
638 Ok(())
639 })
640 .await?;
641 Ok(index_values)
642 }
643
644 pub async fn count(&self) -> Result<usize, ViewError> {
659 let mut count = 0;
660 self.for_each_index(|_index| {
661 count += 1;
662 Ok(())
663 })
664 .await?;
665 Ok(count)
666 }
667
668 pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
682 #[cfg(with_metrics)]
683 let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
684 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
685 if let Some(update) = self.updates.get(index) {
686 let value = match update {
687 Update::Removed => None,
688 Update::Set(value) => Some(value.clone()),
689 };
690 return Ok(value);
691 }
692 if self.deletion_set.contains_prefix_of(index) {
693 return Ok(None);
694 }
695 let key = self
696 .context
697 .base_key()
698 .base_tag_index(KeyTag::Index as u8, index);
699 Ok(self.context.store().read_value_bytes(&key).await?)
700 }
701
702 pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
716 #[cfg(with_metrics)]
717 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
718 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
719 if let Some(update) = self.updates.get(index) {
720 let test = match update {
721 Update::Removed => false,
722 Update::Set(_value) => true,
723 };
724 return Ok(test);
725 }
726 if self.deletion_set.contains_prefix_of(index) {
727 return Ok(false);
728 }
729 let key = self
730 .context
731 .base_key()
732 .base_tag_index(KeyTag::Index as u8, index);
733 Ok(self.context.store().contains_key(&key).await?)
734 }
735
736 pub async fn contains_keys(&self, indices: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewError> {
751 #[cfg(with_metrics)]
752 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
753 let mut results = Vec::with_capacity(indices.len());
754 let mut missed_indices = Vec::new();
755 let mut vector_query = Vec::new();
756 for (i, index) in indices.into_iter().enumerate() {
757 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
758 if let Some(update) = self.updates.get(&index) {
759 let value = match update {
760 Update::Removed => false,
761 Update::Set(_) => true,
762 };
763 results.push(value);
764 } else {
765 results.push(false);
766 if !self.deletion_set.contains_prefix_of(&index) {
767 missed_indices.push(i);
768 let key = self
769 .context
770 .base_key()
771 .base_tag_index(KeyTag::Index as u8, &index);
772 vector_query.push(key);
773 }
774 }
775 }
776 let values = self.context.store().contains_keys(vector_query).await?;
777 for (i, value) in missed_indices.into_iter().zip(values) {
778 results[i] = value;
779 }
780 Ok(results)
781 }
782
783 pub async fn multi_get(
799 &self,
800 indices: Vec<Vec<u8>>,
801 ) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
802 #[cfg(with_metrics)]
803 let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
804 let mut result = Vec::with_capacity(indices.len());
805 let mut missed_indices = Vec::new();
806 let mut vector_query = Vec::new();
807 for (i, index) in indices.into_iter().enumerate() {
808 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
809 if let Some(update) = self.updates.get(&index) {
810 let value = match update {
811 Update::Removed => None,
812 Update::Set(value) => Some(value.clone()),
813 };
814 result.push(value);
815 } else {
816 result.push(None);
817 if !self.deletion_set.contains_prefix_of(&index) {
818 missed_indices.push(i);
819 let key = self
820 .context
821 .base_key()
822 .base_tag_index(KeyTag::Index as u8, &index);
823 vector_query.push(key);
824 }
825 }
826 }
827 let values = self
828 .context
829 .store()
830 .read_multi_values_bytes(vector_query)
831 .await?;
832 for (i, value) in missed_indices.into_iter().zip(values) {
833 result[i] = value;
834 }
835 Ok(result)
836 }
837
838 pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
857 #[cfg(with_metrics)]
858 let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
859 *self.hash.get_mut().unwrap() = None;
860 let max_key_size = self.max_key_size();
861 for operation in batch.operations {
862 match operation {
863 WriteOperation::Delete { key } => {
864 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
865 if let Some(value) = self.sizes.get(&key).await? {
866 let entry_size = SizeData {
867 key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
868 value,
869 };
870 self.total_size.sub_assign(entry_size);
871 }
872 self.sizes.remove(key.clone());
873 if self.deletion_set.contains_prefix_of(&key) {
874 self.updates.remove(&key);
876 } else {
877 self.updates.insert(key, Update::Removed);
878 }
879 }
880 WriteOperation::Put { key, value } => {
881 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
882 let entry_size = SizeData {
883 key: key.len() as u32,
884 value: value.len() as u32,
885 };
886 self.total_size.add_assign(entry_size)?;
887 if let Some(value) = self.sizes.get(&key).await? {
888 let entry_size = SizeData {
889 key: key.len() as u32,
890 value,
891 };
892 self.total_size.sub_assign(entry_size);
893 }
894 self.sizes.insert(key.clone(), entry_size.value);
895 self.updates.insert(key, Update::Set(value));
896 }
897 WriteOperation::DeletePrefix { key_prefix } => {
898 ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
899 let key_list = self
900 .updates
901 .range(get_interval(key_prefix.clone()))
902 .map(|x| x.0.to_vec())
903 .collect::<Vec<_>>();
904 for key in key_list {
905 self.updates.remove(&key);
906 }
907 let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
908 for (key, value) in key_values {
909 let entry_size = SizeData {
910 key: key.len() as u32,
911 value,
912 };
913 self.total_size.sub_assign(entry_size);
914 self.sizes.remove(key);
915 }
916 self.sizes.remove_by_prefix(key_prefix.clone());
917 self.deletion_set.insert_key_prefix(key_prefix);
918 }
919 }
920 }
921 Ok(())
922 }
923
924 pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
937 let mut batch = Batch::new();
938 batch.put_key_value_bytes(index, value);
939 self.write_batch(batch).await
940 }
941
942 pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
956 let mut batch = Batch::new();
957 batch.delete_key(index);
958 self.write_batch(batch).await
959 }
960
961 pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
975 let mut batch = Batch::new();
976 batch.delete_key_prefix(key_prefix);
977 self.write_batch(batch).await
978 }
979
980 pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
995 #[cfg(with_metrics)]
996 let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
997 ensure!(
998 key_prefix.len() <= self.max_key_size(),
999 ViewError::KeyTooLong
1000 );
1001 let len = key_prefix.len();
1002 let key_prefix_full = self
1003 .context
1004 .base_key()
1005 .base_tag_index(KeyTag::Index as u8, key_prefix);
1006 let mut keys = Vec::new();
1007 let key_prefix_upper = get_upper_bound(key_prefix);
1008 let mut updates = self
1009 .updates
1010 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1011 let mut update = updates.next();
1012 if !self.deletion_set.delete_storage_first {
1013 let mut suffix_closed_set =
1014 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1015 for key in self
1016 .context
1017 .store()
1018 .find_keys_by_prefix(&key_prefix_full)
1019 .await?
1020 .iterator()
1021 {
1022 let key = key?;
1023 loop {
1024 match update {
1025 Some((update_key, update_value)) if &update_key[len..] <= key => {
1026 if let Update::Set(_) = update_value {
1027 keys.push(update_key[len..].to_vec());
1028 }
1029 update = updates.next();
1030 if update_key[len..] == key[..] {
1031 break;
1032 }
1033 }
1034 _ => {
1035 let mut key_with_prefix = key_prefix.to_vec();
1036 key_with_prefix.extend_from_slice(key);
1037 if !suffix_closed_set.find_key(&key_with_prefix) {
1038 keys.push(key.to_vec());
1039 }
1040 break;
1041 }
1042 }
1043 }
1044 }
1045 }
1046 while let Some((update_key, update_value)) = update {
1047 if let Update::Set(_) = update_value {
1048 let update_key = update_key[len..].to_vec();
1049 keys.push(update_key);
1050 }
1051 update = updates.next();
1052 }
1053 Ok(keys)
1054 }
1055
1056 pub async fn find_key_values_by_prefix(
1072 &self,
1073 key_prefix: &[u8],
1074 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1075 #[cfg(with_metrics)]
1076 let _latency =
1077 metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1078 ensure!(
1079 key_prefix.len() <= self.max_key_size(),
1080 ViewError::KeyTooLong
1081 );
1082 let len = key_prefix.len();
1083 let key_prefix_full = self
1084 .context
1085 .base_key()
1086 .base_tag_index(KeyTag::Index as u8, key_prefix);
1087 let mut key_values = Vec::new();
1088 let key_prefix_upper = get_upper_bound(key_prefix);
1089 let mut updates = self
1090 .updates
1091 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1092 let mut update = updates.next();
1093 if !self.deletion_set.delete_storage_first {
1094 let mut suffix_closed_set =
1095 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1096 for entry in self
1097 .context
1098 .store()
1099 .find_key_values_by_prefix(&key_prefix_full)
1100 .await?
1101 .into_iterator_owned()
1102 {
1103 let (key, value) = entry?;
1104 loop {
1105 match update {
1106 Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1107 if let Update::Set(update_value) = update_value {
1108 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1109 key_values.push(key_value);
1110 }
1111 update = updates.next();
1112 if update_key[len..] == key[..] {
1113 break;
1114 }
1115 }
1116 _ => {
1117 let mut key_with_prefix = key_prefix.to_vec();
1118 key_with_prefix.extend_from_slice(&key);
1119 if !suffix_closed_set.find_key(&key_with_prefix) {
1120 key_values.push((key, value));
1121 }
1122 break;
1123 }
1124 }
1125 }
1126 }
1127 }
1128 while let Some((update_key, update_value)) = update {
1129 if let Update::Set(update_value) = update_value {
1130 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1131 key_values.push(key_value);
1132 }
1133 update = updates.next();
1134 }
1135 Ok(key_values)
1136 }
1137
1138 async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1139 #[cfg(with_metrics)]
1140 let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1141 let mut hasher = sha3::Sha3_256::default();
1142 let mut count = 0u32;
1143 self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1144 count += 1;
1145 hasher.update_with_bytes(index)?;
1146 hasher.update_with_bytes(value)?;
1147 Ok(())
1148 })
1149 .await?;
1150 hasher.update_with_bcs_bytes(&count)?;
1151 Ok(hasher.finalize())
1152 }
1153}
1154
1155impl<C: Context> HashableView for KeyValueStoreView<C> {
1156 type Hasher = sha3::Sha3_256;
1157
1158 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1159 let hash = *self.hash.get_mut().unwrap();
1160 match hash {
1161 Some(hash) => Ok(hash),
1162 None => {
1163 let new_hash = self.compute_hash().await?;
1164 let hash = self.hash.get_mut().unwrap();
1165 *hash = Some(new_hash);
1166 Ok(new_hash)
1167 }
1168 }
1169 }
1170
1171 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1172 let hash = *self.hash.lock().unwrap();
1173 match hash {
1174 Some(hash) => Ok(hash),
1175 None => {
1176 let new_hash = self.compute_hash().await?;
1177 let mut hash = self.hash.lock().unwrap();
1178 *hash = Some(new_hash);
1179 Ok(new_hash)
1180 }
1181 }
1182 }
1183}
1184
1185#[cfg(with_testing)]
1187#[derive(Debug, Clone)]
1188pub struct ViewContainer<C> {
1189 view: Arc<RwLock<KeyValueStoreView<C>>>,
1190}
1191
1192#[cfg(with_testing)]
1193impl<C> WithError for ViewContainer<C> {
1194 type Error = ViewContainerError;
1195}
1196
1197#[cfg(with_testing)]
1198#[derive(Error, Debug)]
1200pub enum ViewContainerError {
1201 #[error(transparent)]
1203 ViewError(#[from] ViewError),
1204
1205 #[error(transparent)]
1207 BcsError(#[from] bcs::Error),
1208}
1209
1210#[cfg(with_testing)]
1211impl KeyValueStoreError for ViewContainerError {
1212 const BACKEND: &'static str = "view_container";
1213}
1214
1215#[cfg(with_testing)]
1216impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1217 const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1218 type Keys = Vec<Vec<u8>>;
1219 type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
1220
1221 fn max_stream_queries(&self) -> usize {
1222 1
1223 }
1224
1225 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1226 let view = self.view.read().await;
1227 Ok(view.get(key).await?)
1228 }
1229
1230 async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1231 let view = self.view.read().await;
1232 Ok(view.contains_key(key).await?)
1233 }
1234
1235 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewContainerError> {
1236 let view = self.view.read().await;
1237 Ok(view.contains_keys(keys).await?)
1238 }
1239
1240 async fn read_multi_values_bytes(
1241 &self,
1242 keys: Vec<Vec<u8>>,
1243 ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1244 let view = self.view.read().await;
1245 Ok(view.multi_get(keys).await?)
1246 }
1247
1248 async fn find_keys_by_prefix(
1249 &self,
1250 key_prefix: &[u8],
1251 ) -> Result<Self::Keys, ViewContainerError> {
1252 let view = self.view.read().await;
1253 Ok(view.find_keys_by_prefix(key_prefix).await?)
1254 }
1255
1256 async fn find_key_values_by_prefix(
1257 &self,
1258 key_prefix: &[u8],
1259 ) -> Result<Self::KeyValues, ViewContainerError> {
1260 let view = self.view.read().await;
1261 Ok(view.find_key_values_by_prefix(key_prefix).await?)
1262 }
1263}
1264
1265#[cfg(with_testing)]
1266impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1267 const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1268
1269 async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1270 let mut view = self.view.write().await;
1271 view.write_batch(batch).await?;
1272 let mut batch = Batch::new();
1273 view.flush(&mut batch)?;
1274 view.context()
1275 .store()
1276 .write_batch(batch)
1277 .await
1278 .map_err(ViewError::from)?;
1279 Ok(())
1280 }
1281
1282 async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1283 Ok(())
1284 }
1285}
1286
1287#[cfg(with_testing)]
1288impl<C: Context> ViewContainer<C> {
1289 pub async fn new(context: C) -> Result<Self, ViewError> {
1291 let view = KeyValueStoreView::load(context).await?;
1292 let view = Arc::new(RwLock::new(view));
1293 Ok(Self { view })
1294 }
1295}