1use std::{collections::BTreeMap, fmt::Debug, mem, ops::Bound::Included, sync::Mutex};
17
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{data_types::ArithmeticError, ensure};
21use serde::{Deserialize, Serialize};
22
23use crate::{
24 batch::{Batch, WriteOperation},
25 common::{
26 from_bytes_option, from_bytes_option_or_default, get_key_range_for_prefix, get_upper_bound,
27 DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
28 },
29 context::Context,
30 hashable_wrapper::WrappedHashableContainerView,
31 historical_hash_wrapper::HistoricallyHashableView,
32 map_view::ByteMapView,
33 store::ReadableKeyValueStore,
34 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
35};
36
37#[cfg(with_metrics)]
38mod metrics {
39 use std::sync::LazyLock;
40
41 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
42 use prometheus::HistogramVec;
43
44 pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
46 register_histogram_vec(
47 "key_value_store_view_hash_latency",
48 "KeyValueStoreView hash latency",
49 &[],
50 exponential_bucket_latencies(5.0),
51 )
52 });
53
54 pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
56 register_histogram_vec(
57 "key_value_store_view_get_latency",
58 "KeyValueStoreView get latency",
59 &[],
60 exponential_bucket_latencies(5.0),
61 )
62 });
63
64 pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
66 LazyLock::new(|| {
67 register_histogram_vec(
68 "key_value_store_view_multi_get_latency",
69 "KeyValueStoreView multi get latency",
70 &[],
71 exponential_bucket_latencies(5.0),
72 )
73 });
74
75 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
77 LazyLock::new(|| {
78 register_histogram_vec(
79 "key_value_store_view_contains_key_latency",
80 "KeyValueStoreView contains key latency",
81 &[],
82 exponential_bucket_latencies(5.0),
83 )
84 });
85
86 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
88 LazyLock::new(|| {
89 register_histogram_vec(
90 "key_value_store_view_contains_keys_latency",
91 "KeyValueStoreView contains keys latency",
92 &[],
93 exponential_bucket_latencies(5.0),
94 )
95 });
96
97 pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
99 LazyLock::new(|| {
100 register_histogram_vec(
101 "key_value_store_view_find_keys_by_prefix_latency",
102 "KeyValueStoreView find keys by prefix latency",
103 &[],
104 exponential_bucket_latencies(5.0),
105 )
106 });
107
108 pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
110 LazyLock::new(|| {
111 register_histogram_vec(
112 "key_value_store_view_find_key_values_by_prefix_latency",
113 "KeyValueStoreView find key values by prefix latency",
114 &[],
115 exponential_bucket_latencies(5.0),
116 )
117 });
118
119 pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
121 LazyLock::new(|| {
122 register_histogram_vec(
123 "key_value_store_view_write_batch_latency",
124 "KeyValueStoreView write batch latency",
125 &[],
126 exponential_bucket_latencies(5.0),
127 )
128 });
129}
130
131#[cfg(with_testing)]
132use {
133 crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
134 async_lock::RwLock,
135 std::sync::Arc,
136 thiserror::Error,
137};
138
139#[repr(u8)]
140enum KeyTag {
141 Index = MIN_VIEW_TAG,
143 TotalSize,
145 Sizes,
147 Hash,
149}
150
151#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
153pub struct SizeData {
154 pub key: u32,
156 pub value: u32,
158}
159
160impl SizeData {
161 pub fn sum(&mut self) -> u32 {
163 self.key + self.value
164 }
165
166 pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
168 self.key = self
169 .key
170 .checked_add(size.key)
171 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
172 self.value = self
173 .value
174 .checked_add(size.value)
175 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
176 Ok(())
177 }
178
179 pub fn sub_assign(&mut self, size: SizeData) {
181 self.key -= size.key;
182 self.value -= size.value;
183 }
184}
185
186#[derive(Debug)]
204pub struct KeyValueStoreView<C> {
205 context: C,
206 deletion_set: DeletionSet,
207 updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
208 stored_total_size: SizeData,
209 total_size: SizeData,
210 sizes: ByteMapView<C, u32>,
211 stored_hash: Option<HasherOutput>,
212 hash: Mutex<Option<HasherOutput>>,
213}
214
215impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
216 type Target = KeyValueStoreView<C2>;
217
218 async fn with_context(
219 &mut self,
220 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
221 ) -> Self::Target {
222 let hash = *self.hash.lock().unwrap();
223 KeyValueStoreView {
224 context: ctx.clone()(self.context()),
225 deletion_set: self.deletion_set.clone(),
226 updates: self.updates.clone(),
227 stored_total_size: self.stored_total_size,
228 total_size: self.total_size,
229 sizes: self.sizes.with_context(ctx.clone()).await,
230 stored_hash: self.stored_hash,
231 hash: Mutex::new(hash),
232 }
233 }
234}
235
236impl<C: Context> View for KeyValueStoreView<C> {
237 const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
238
239 type Context = C;
240
241 fn context(&self) -> &C {
242 &self.context
243 }
244
245 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
246 let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
247 let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
248 let mut v = vec![key_hash, key_total_size];
249 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
250 let context_sizes = context.clone_with_base_key(base_key);
251 v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
252 Ok(v)
253 }
254
255 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
256 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
257 let total_size =
258 from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
259 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
260 let context_sizes = context.clone_with_base_key(base_key);
261 let sizes = ByteMapView::post_load(
262 context_sizes,
263 values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
264 )?;
265 Ok(Self {
266 context,
267 deletion_set: DeletionSet::new(),
268 updates: BTreeMap::new(),
269 stored_total_size: total_size,
270 total_size,
271 sizes,
272 stored_hash: hash,
273 hash: Mutex::new(hash),
274 })
275 }
276
277 fn rollback(&mut self) {
278 self.deletion_set.rollback();
279 self.updates.clear();
280 self.total_size = self.stored_total_size;
281 self.sizes.rollback();
282 *self.hash.get_mut().unwrap() = self.stored_hash;
283 }
284
285 async fn has_pending_changes(&self) -> bool {
286 if self.deletion_set.has_pending_changes() {
287 return true;
288 }
289 if !self.updates.is_empty() {
290 return true;
291 }
292 if self.stored_total_size != self.total_size {
293 return true;
294 }
295 if self.sizes.has_pending_changes().await {
296 return true;
297 }
298 let hash = self.hash.lock().unwrap();
299 self.stored_hash != *hash
300 }
301
302 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
303 let mut delete_view = false;
304 if self.deletion_set.delete_storage_first {
305 delete_view = true;
306 self.stored_total_size = SizeData::default();
307 batch.delete_key_prefix(self.context.base_key().bytes.clone());
308 for (index, update) in mem::take(&mut self.updates) {
309 if let Update::Set(value) = update {
310 let key = self
311 .context
312 .base_key()
313 .base_tag_index(KeyTag::Index as u8, &index);
314 batch.put_key_value_bytes(key, value);
315 delete_view = false;
316 }
317 }
318 self.stored_hash = None
319 } else {
320 for index in mem::take(&mut self.deletion_set.deleted_prefixes) {
321 let key = self
322 .context
323 .base_key()
324 .base_tag_index(KeyTag::Index as u8, &index);
325 batch.delete_key_prefix(key);
326 }
327 for (index, update) in mem::take(&mut self.updates) {
328 let key = self
329 .context
330 .base_key()
331 .base_tag_index(KeyTag::Index as u8, &index);
332 match update {
333 Update::Removed => batch.delete_key(key),
334 Update::Set(value) => batch.put_key_value_bytes(key, value),
335 }
336 }
337 }
338 self.sizes.flush(batch)?;
339 let hash = *self.hash.get_mut().unwrap();
340 if self.stored_hash != hash {
341 let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
342 match hash {
343 None => batch.delete_key(key),
344 Some(hash) => batch.put_key_value(key, &hash)?,
345 }
346 self.stored_hash = hash;
347 }
348 if self.stored_total_size != self.total_size {
349 let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
350 batch.put_key_value(key, &self.total_size)?;
351 self.stored_total_size = self.total_size;
352 }
353 self.deletion_set.delete_storage_first = false;
354 Ok(delete_view)
355 }
356
357 fn clear(&mut self) {
358 self.deletion_set.clear();
359 self.updates.clear();
360 self.total_size = SizeData::default();
361 self.sizes.clear();
362 *self.hash.get_mut().unwrap() = None;
363 }
364}
365
366impl<C: Context> ClonableView for KeyValueStoreView<C> {
367 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
368 Ok(KeyValueStoreView {
369 context: self.context.clone(),
370 deletion_set: self.deletion_set.clone(),
371 updates: self.updates.clone(),
372 stored_total_size: self.stored_total_size,
373 total_size: self.total_size,
374 sizes: self.sizes.clone_unchecked()?,
375 stored_hash: self.stored_hash,
376 hash: Mutex::new(*self.hash.get_mut().unwrap()),
377 })
378 }
379}
380
381impl<C: Context> KeyValueStoreView<C> {
382 fn max_key_size(&self) -> usize {
383 let prefix_len = self.context.base_key().bytes.len();
384 <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
385 }
386
387 pub fn total_size(&self) -> SizeData {
400 self.total_size
401 }
402
403 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
426 where
427 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
428 {
429 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
430 let mut updates = self.updates.iter();
431 let mut update = updates.next();
432 if !self.deletion_set.delete_storage_first {
433 let mut suffix_closed_set =
434 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
435 for index in self
436 .context
437 .store()
438 .find_keys_by_prefix(&key_prefix)
439 .await?
440 {
441 loop {
442 match update {
443 Some((key, value)) if key <= &index => {
444 if let Update::Set(_) = value {
445 if !f(key)? {
446 return Ok(());
447 }
448 }
449 update = updates.next();
450 if key == &index {
451 break;
452 }
453 }
454 _ => {
455 if !suffix_closed_set.find_key(&index) && !f(&index)? {
456 return Ok(());
457 }
458 break;
459 }
460 }
461 }
462 }
463 }
464 while let Some((key, value)) = update {
465 if let Update::Set(_) = value {
466 if !f(key)? {
467 return Ok(());
468 }
469 }
470 update = updates.next();
471 }
472 Ok(())
473 }
474
475 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
497 where
498 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
499 {
500 self.for_each_index_while(|key| {
501 f(key)?;
502 Ok(true)
503 })
504 .await
505 }
506
507 pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
529 where
530 F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
531 {
532 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
533 let mut updates = self.updates.iter();
534 let mut update = updates.next();
535 if !self.deletion_set.delete_storage_first {
536 let mut suffix_closed_set =
537 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
538 for entry in self
539 .context
540 .store()
541 .find_key_values_by_prefix(&key_prefix)
542 .await?
543 {
544 let (index, index_val) = entry;
545 loop {
546 match update {
547 Some((key, value)) if key <= &index => {
548 if let Update::Set(value) = value {
549 if !f(key, value)? {
550 return Ok(());
551 }
552 }
553 update = updates.next();
554 if key == &index {
555 break;
556 }
557 }
558 _ => {
559 if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
560 return Ok(());
561 }
562 break;
563 }
564 }
565 }
566 }
567 }
568 while let Some((key, value)) = update {
569 if let Update::Set(value) = value {
570 if !f(key, value)? {
571 return Ok(());
572 }
573 }
574 update = updates.next();
575 }
576 Ok(())
577 }
578
579 pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
600 where
601 F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
602 {
603 self.for_each_index_value_while(|key, value| {
604 f(key, value)?;
605 Ok(true)
606 })
607 .await
608 }
609
610 pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
625 let mut indices = Vec::new();
626 self.for_each_index(|index| {
627 indices.push(index.to_vec());
628 Ok(())
629 })
630 .await?;
631 Ok(indices)
632 }
633
634 pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
649 let mut index_values = Vec::new();
650 self.for_each_index_value(|index, value| {
651 index_values.push((index.to_vec(), value.to_vec()));
652 Ok(())
653 })
654 .await?;
655 Ok(index_values)
656 }
657
658 pub async fn count(&self) -> Result<usize, ViewError> {
673 let mut count = 0;
674 self.for_each_index(|_index| {
675 count += 1;
676 Ok(())
677 })
678 .await?;
679 Ok(count)
680 }
681
682 pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
696 #[cfg(with_metrics)]
697 let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
698 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
699 if let Some(update) = self.updates.get(index) {
700 let value = match update {
701 Update::Removed => None,
702 Update::Set(value) => Some(value.clone()),
703 };
704 return Ok(value);
705 }
706 if self.deletion_set.contains_prefix_of(index) {
707 return Ok(None);
708 }
709 let key = self
710 .context
711 .base_key()
712 .base_tag_index(KeyTag::Index as u8, index);
713 Ok(self.context.store().read_value_bytes(&key).await?)
714 }
715
716 pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
730 #[cfg(with_metrics)]
731 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
732 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
733 if let Some(update) = self.updates.get(index) {
734 let test = match update {
735 Update::Removed => false,
736 Update::Set(_value) => true,
737 };
738 return Ok(test);
739 }
740 if self.deletion_set.contains_prefix_of(index) {
741 return Ok(false);
742 }
743 let key = self
744 .context
745 .base_key()
746 .base_tag_index(KeyTag::Index as u8, index);
747 Ok(self.context.store().contains_key(&key).await?)
748 }
749
750 pub async fn contains_keys(&self, indices: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewError> {
765 #[cfg(with_metrics)]
766 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
767 let mut results = Vec::with_capacity(indices.len());
768 let mut missed_indices = Vec::new();
769 let mut vector_query = Vec::new();
770 for (i, index) in indices.into_iter().enumerate() {
771 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
772 if let Some(update) = self.updates.get(&index) {
773 let value = match update {
774 Update::Removed => false,
775 Update::Set(_) => true,
776 };
777 results.push(value);
778 } else {
779 results.push(false);
780 if !self.deletion_set.contains_prefix_of(&index) {
781 missed_indices.push(i);
782 let key = self
783 .context
784 .base_key()
785 .base_tag_index(KeyTag::Index as u8, &index);
786 vector_query.push(key);
787 }
788 }
789 }
790 let values = self.context.store().contains_keys(vector_query).await?;
791 for (i, value) in missed_indices.into_iter().zip(values) {
792 results[i] = value;
793 }
794 Ok(results)
795 }
796
797 pub async fn multi_get(
813 &self,
814 indices: Vec<Vec<u8>>,
815 ) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
816 #[cfg(with_metrics)]
817 let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
818 let mut result = Vec::with_capacity(indices.len());
819 let mut missed_indices = Vec::new();
820 let mut vector_query = Vec::new();
821 for (i, index) in indices.into_iter().enumerate() {
822 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
823 if let Some(update) = self.updates.get(&index) {
824 let value = match update {
825 Update::Removed => None,
826 Update::Set(value) => Some(value.clone()),
827 };
828 result.push(value);
829 } else {
830 result.push(None);
831 if !self.deletion_set.contains_prefix_of(&index) {
832 missed_indices.push(i);
833 let key = self
834 .context
835 .base_key()
836 .base_tag_index(KeyTag::Index as u8, &index);
837 vector_query.push(key);
838 }
839 }
840 }
841 let values = self
842 .context
843 .store()
844 .read_multi_values_bytes(vector_query)
845 .await?;
846 for (i, value) in missed_indices.into_iter().zip(values) {
847 result[i] = value;
848 }
849 Ok(result)
850 }
851
852 pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
871 #[cfg(with_metrics)]
872 let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
873 *self.hash.get_mut().unwrap() = None;
874 let max_key_size = self.max_key_size();
875 for operation in batch.operations {
876 match operation {
877 WriteOperation::Delete { key } => {
878 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
879 if let Some(value) = self.sizes.get(&key).await? {
880 let entry_size = SizeData {
881 key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
882 value,
883 };
884 self.total_size.sub_assign(entry_size);
885 }
886 self.sizes.remove(key.clone());
887 if self.deletion_set.contains_prefix_of(&key) {
888 self.updates.remove(&key);
890 } else {
891 self.updates.insert(key, Update::Removed);
892 }
893 }
894 WriteOperation::Put { key, value } => {
895 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
896 let entry_size = SizeData {
897 key: key.len() as u32,
898 value: value.len() as u32,
899 };
900 self.total_size.add_assign(entry_size)?;
901 if let Some(value) = self.sizes.get(&key).await? {
902 let entry_size = SizeData {
903 key: key.len() as u32,
904 value,
905 };
906 self.total_size.sub_assign(entry_size);
907 }
908 self.sizes.insert(key.clone(), entry_size.value);
909 self.updates.insert(key, Update::Set(value));
910 }
911 WriteOperation::DeletePrefix { key_prefix } => {
912 ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
913 let key_list = self
914 .updates
915 .range(get_key_range_for_prefix(key_prefix.clone()))
916 .map(|x| x.0.to_vec())
917 .collect::<Vec<_>>();
918 for key in key_list {
919 self.updates.remove(&key);
920 }
921 let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
922 for (key, value) in key_values {
923 let entry_size = SizeData {
924 key: key.len() as u32,
925 value,
926 };
927 self.total_size.sub_assign(entry_size);
928 self.sizes.remove(key);
929 }
930 self.sizes.remove_by_prefix(key_prefix.clone());
931 self.deletion_set.insert_key_prefix(key_prefix);
932 }
933 }
934 }
935 Ok(())
936 }
937
938 pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
951 let mut batch = Batch::new();
952 batch.put_key_value_bytes(index, value);
953 self.write_batch(batch).await
954 }
955
956 pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
970 let mut batch = Batch::new();
971 batch.delete_key(index);
972 self.write_batch(batch).await
973 }
974
975 pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
989 let mut batch = Batch::new();
990 batch.delete_key_prefix(key_prefix);
991 self.write_batch(batch).await
992 }
993
994 pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
1009 #[cfg(with_metrics)]
1010 let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
1011 ensure!(
1012 key_prefix.len() <= self.max_key_size(),
1013 ViewError::KeyTooLong
1014 );
1015 let len = key_prefix.len();
1016 let key_prefix_full = self
1017 .context
1018 .base_key()
1019 .base_tag_index(KeyTag::Index as u8, key_prefix);
1020 let mut keys = Vec::new();
1021 let key_prefix_upper = get_upper_bound(key_prefix);
1022 let mut updates = self
1023 .updates
1024 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1025 let mut update = updates.next();
1026 if !self.deletion_set.delete_storage_first {
1027 let mut suffix_closed_set =
1028 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1029 for key in self
1030 .context
1031 .store()
1032 .find_keys_by_prefix(&key_prefix_full)
1033 .await?
1034 {
1035 loop {
1036 match update {
1037 Some((update_key, update_value))
1038 if &update_key[len..] <= key.as_slice() =>
1039 {
1040 if let Update::Set(_) = update_value {
1041 keys.push(update_key[len..].to_vec());
1042 }
1043 update = updates.next();
1044 if update_key[len..] == key[..] {
1045 break;
1046 }
1047 }
1048 _ => {
1049 let mut key_with_prefix = key_prefix.to_vec();
1050 key_with_prefix.extend_from_slice(&key);
1051 if !suffix_closed_set.find_key(&key_with_prefix) {
1052 keys.push(key);
1053 }
1054 break;
1055 }
1056 }
1057 }
1058 }
1059 }
1060 while let Some((update_key, update_value)) = update {
1061 if let Update::Set(_) = update_value {
1062 let update_key = update_key[len..].to_vec();
1063 keys.push(update_key);
1064 }
1065 update = updates.next();
1066 }
1067 Ok(keys)
1068 }
1069
1070 pub async fn find_key_values_by_prefix(
1086 &self,
1087 key_prefix: &[u8],
1088 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1089 #[cfg(with_metrics)]
1090 let _latency =
1091 metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1092 ensure!(
1093 key_prefix.len() <= self.max_key_size(),
1094 ViewError::KeyTooLong
1095 );
1096 let len = key_prefix.len();
1097 let key_prefix_full = self
1098 .context
1099 .base_key()
1100 .base_tag_index(KeyTag::Index as u8, key_prefix);
1101 let mut key_values = Vec::new();
1102 let key_prefix_upper = get_upper_bound(key_prefix);
1103 let mut updates = self
1104 .updates
1105 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1106 let mut update = updates.next();
1107 if !self.deletion_set.delete_storage_first {
1108 let mut suffix_closed_set =
1109 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1110 for entry in self
1111 .context
1112 .store()
1113 .find_key_values_by_prefix(&key_prefix_full)
1114 .await?
1115 {
1116 let (key, value) = entry;
1117 loop {
1118 match update {
1119 Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1120 if let Update::Set(update_value) = update_value {
1121 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1122 key_values.push(key_value);
1123 }
1124 update = updates.next();
1125 if update_key[len..] == key[..] {
1126 break;
1127 }
1128 }
1129 _ => {
1130 let mut key_with_prefix = key_prefix.to_vec();
1131 key_with_prefix.extend_from_slice(&key);
1132 if !suffix_closed_set.find_key(&key_with_prefix) {
1133 key_values.push((key, value));
1134 }
1135 break;
1136 }
1137 }
1138 }
1139 }
1140 }
1141 while let Some((update_key, update_value)) = update {
1142 if let Update::Set(update_value) = update_value {
1143 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1144 key_values.push(key_value);
1145 }
1146 update = updates.next();
1147 }
1148 Ok(key_values)
1149 }
1150
1151 async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1152 #[cfg(with_metrics)]
1153 let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1154 let mut hasher = sha3::Sha3_256::default();
1155 let mut count = 0u32;
1156 self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1157 count += 1;
1158 hasher.update_with_bytes(index)?;
1159 hasher.update_with_bytes(value)?;
1160 Ok(())
1161 })
1162 .await?;
1163 hasher.update_with_bcs_bytes(&count)?;
1164 Ok(hasher.finalize())
1165 }
1166}
1167
1168impl<C: Context> HashableView for KeyValueStoreView<C> {
1169 type Hasher = sha3::Sha3_256;
1170
1171 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1172 let hash = *self.hash.get_mut().unwrap();
1173 match hash {
1174 Some(hash) => Ok(hash),
1175 None => {
1176 let new_hash = self.compute_hash().await?;
1177 let hash = self.hash.get_mut().unwrap();
1178 *hash = Some(new_hash);
1179 Ok(new_hash)
1180 }
1181 }
1182 }
1183
1184 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1185 let hash = *self.hash.lock().unwrap();
1186 match hash {
1187 Some(hash) => Ok(hash),
1188 None => {
1189 let new_hash = self.compute_hash().await?;
1190 let mut hash = self.hash.lock().unwrap();
1191 *hash = Some(new_hash);
1192 Ok(new_hash)
1193 }
1194 }
1195 }
1196}
1197
1198pub type HashedKeyValueStoreView<C> =
1200 WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1201
1202pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1204
1205#[cfg(with_testing)]
1207#[derive(Debug, Clone)]
1208pub struct ViewContainer<C> {
1209 view: Arc<RwLock<KeyValueStoreView<C>>>,
1210}
1211
1212#[cfg(with_testing)]
1213impl<C> WithError for ViewContainer<C> {
1214 type Error = ViewContainerError;
1215}
1216
1217#[cfg(with_testing)]
1218#[derive(Error, Debug)]
1220pub enum ViewContainerError {
1221 #[error(transparent)]
1223 ViewError(#[from] ViewError),
1224
1225 #[error(transparent)]
1227 BcsError(#[from] bcs::Error),
1228}
1229
1230#[cfg(with_testing)]
1231impl KeyValueStoreError for ViewContainerError {
1232 const BACKEND: &'static str = "view_container";
1233}
1234
1235#[cfg(with_testing)]
1236impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1237 const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1238
1239 fn max_stream_queries(&self) -> usize {
1240 1
1241 }
1242
1243 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1244 let view = self.view.read().await;
1245 Ok(view.get(key).await?)
1246 }
1247
1248 async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1249 let view = self.view.read().await;
1250 Ok(view.contains_key(key).await?)
1251 }
1252
1253 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewContainerError> {
1254 let view = self.view.read().await;
1255 Ok(view.contains_keys(keys).await?)
1256 }
1257
1258 async fn read_multi_values_bytes(
1259 &self,
1260 keys: Vec<Vec<u8>>,
1261 ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1262 let view = self.view.read().await;
1263 Ok(view.multi_get(keys).await?)
1264 }
1265
1266 async fn find_keys_by_prefix(
1267 &self,
1268 key_prefix: &[u8],
1269 ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1270 let view = self.view.read().await;
1271 Ok(view.find_keys_by_prefix(key_prefix).await?)
1272 }
1273
1274 async fn find_key_values_by_prefix(
1275 &self,
1276 key_prefix: &[u8],
1277 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1278 let view = self.view.read().await;
1279 Ok(view.find_key_values_by_prefix(key_prefix).await?)
1280 }
1281}
1282
1283#[cfg(with_testing)]
1284impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1285 const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1286
1287 async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1288 let mut view = self.view.write().await;
1289 view.write_batch(batch).await?;
1290 let mut batch = Batch::new();
1291 view.flush(&mut batch)?;
1292 view.context()
1293 .store()
1294 .write_batch(batch)
1295 .await
1296 .map_err(ViewError::from)?;
1297 Ok(())
1298 }
1299
1300 async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1301 Ok(())
1302 }
1303}
1304
1305#[cfg(with_testing)]
1306impl<C: Context> ViewContainer<C> {
1307 pub async fn new(context: C) -> Result<Self, ViewError> {
1309 let view = KeyValueStoreView::load(context).await?;
1310 let view = Arc::new(RwLock::new(view));
1311 Ok(Self { view })
1312 }
1313}