1use std::{collections::BTreeMap, fmt::Debug, mem, ops::Bound::Included, sync::Mutex};
17
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{data_types::ArithmeticError, ensure};
21use serde::{Deserialize, Serialize};
22
23use crate::{
24 batch::{Batch, WriteOperation},
25 common::{
26 from_bytes_option, from_bytes_option_or_default, get_interval, get_upper_bound,
27 DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
28 },
29 context::Context,
30 map_view::ByteMapView,
31 store::ReadableKeyValueStore,
32 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
33};
34
35#[cfg(with_metrics)]
36mod metrics {
37 use std::sync::LazyLock;
38
39 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
40 use prometheus::HistogramVec;
41
42 pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
44 register_histogram_vec(
45 "key_value_store_view_hash_latency",
46 "KeyValueStoreView hash latency",
47 &[],
48 exponential_bucket_latencies(5.0),
49 )
50 });
51
52 pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54 register_histogram_vec(
55 "key_value_store_view_get_latency",
56 "KeyValueStoreView get latency",
57 &[],
58 exponential_bucket_latencies(5.0),
59 )
60 });
61
62 pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
64 LazyLock::new(|| {
65 register_histogram_vec(
66 "key_value_store_view_multi_get_latency",
67 "KeyValueStoreView multi get latency",
68 &[],
69 exponential_bucket_latencies(5.0),
70 )
71 });
72
73 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
75 LazyLock::new(|| {
76 register_histogram_vec(
77 "key_value_store_view_contains_key_latency",
78 "KeyValueStoreView contains key latency",
79 &[],
80 exponential_bucket_latencies(5.0),
81 )
82 });
83
84 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
86 LazyLock::new(|| {
87 register_histogram_vec(
88 "key_value_store_view_contains_keys_latency",
89 "KeyValueStoreView contains keys latency",
90 &[],
91 exponential_bucket_latencies(5.0),
92 )
93 });
94
95 pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
97 LazyLock::new(|| {
98 register_histogram_vec(
99 "key_value_store_view_find_keys_by_prefix_latency",
100 "KeyValueStoreView find keys by prefix latency",
101 &[],
102 exponential_bucket_latencies(5.0),
103 )
104 });
105
106 pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
108 LazyLock::new(|| {
109 register_histogram_vec(
110 "key_value_store_view_find_key_values_by_prefix_latency",
111 "KeyValueStoreView find key values by prefix latency",
112 &[],
113 exponential_bucket_latencies(5.0),
114 )
115 });
116
117 pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
119 LazyLock::new(|| {
120 register_histogram_vec(
121 "key_value_store_view_write_batch_latency",
122 "KeyValueStoreView write batch latency",
123 &[],
124 exponential_bucket_latencies(5.0),
125 )
126 });
127}
128
129#[cfg(with_testing)]
130use {
131 crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
132 async_lock::RwLock,
133 std::sync::Arc,
134 thiserror::Error,
135};
136
137#[repr(u8)]
138enum KeyTag {
139 Index = MIN_VIEW_TAG,
141 TotalSize,
143 Sizes,
145 Hash,
147}
148
149#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub struct SizeData {
152 pub key: u32,
154 pub value: u32,
156}
157
158impl SizeData {
159 pub fn sum(&mut self) -> u32 {
161 self.key + self.value
162 }
163
164 pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
166 self.key = self
167 .key
168 .checked_add(size.key)
169 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
170 self.value = self
171 .value
172 .checked_add(size.value)
173 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
174 Ok(())
175 }
176
177 pub fn sub_assign(&mut self, size: SizeData) {
179 self.key -= size.key;
180 self.value -= size.value;
181 }
182}
183
184#[derive(Debug)]
202pub struct KeyValueStoreView<C> {
203 context: C,
204 deletion_set: DeletionSet,
205 updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
206 stored_total_size: SizeData,
207 total_size: SizeData,
208 sizes: ByteMapView<C, u32>,
209 stored_hash: Option<HasherOutput>,
210 hash: Mutex<Option<HasherOutput>>,
211}
212
213impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
214 type Target = KeyValueStoreView<C2>;
215
216 async fn with_context(
217 &mut self,
218 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
219 ) -> Self::Target {
220 let hash = *self.hash.lock().unwrap();
221 KeyValueStoreView {
222 context: ctx.clone()(self.context()),
223 deletion_set: self.deletion_set.clone(),
224 updates: self.updates.clone(),
225 stored_total_size: self.stored_total_size,
226 total_size: self.total_size,
227 sizes: self.sizes.with_context(ctx.clone()).await,
228 stored_hash: self.stored_hash,
229 hash: Mutex::new(hash),
230 }
231 }
232}
233
234impl<C: Context> View for KeyValueStoreView<C> {
235 const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
236
237 type Context = C;
238
239 fn context(&self) -> &C {
240 &self.context
241 }
242
243 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
244 let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
245 let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
246 let mut v = vec![key_hash, key_total_size];
247 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
248 let context_sizes = context.clone_with_base_key(base_key);
249 v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
250 Ok(v)
251 }
252
253 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
254 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
255 let total_size =
256 from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
257 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
258 let context_sizes = context.clone_with_base_key(base_key);
259 let sizes = ByteMapView::post_load(
260 context_sizes,
261 values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
262 )?;
263 Ok(Self {
264 context,
265 deletion_set: DeletionSet::new(),
266 updates: BTreeMap::new(),
267 stored_total_size: total_size,
268 total_size,
269 sizes,
270 stored_hash: hash,
271 hash: Mutex::new(hash),
272 })
273 }
274
275 async fn load(context: C) -> Result<Self, ViewError> {
276 let keys = Self::pre_load(&context)?;
277 let values = context.store().read_multi_values_bytes(keys).await?;
278 Self::post_load(context, &values)
279 }
280
281 fn rollback(&mut self) {
282 self.deletion_set.rollback();
283 self.updates.clear();
284 self.total_size = self.stored_total_size;
285 self.sizes.rollback();
286 *self.hash.get_mut().unwrap() = self.stored_hash;
287 }
288
289 async fn has_pending_changes(&self) -> bool {
290 if self.deletion_set.has_pending_changes() {
291 return true;
292 }
293 if !self.updates.is_empty() {
294 return true;
295 }
296 if self.stored_total_size != self.total_size {
297 return true;
298 }
299 if self.sizes.has_pending_changes().await {
300 return true;
301 }
302 let hash = self.hash.lock().unwrap();
303 self.stored_hash != *hash
304 }
305
306 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
307 let mut delete_view = false;
308 if self.deletion_set.delete_storage_first {
309 delete_view = true;
310 self.stored_total_size = SizeData::default();
311 batch.delete_key_prefix(self.context.base_key().bytes.clone());
312 for (index, update) in mem::take(&mut self.updates) {
313 if let Update::Set(value) = update {
314 let key = self
315 .context
316 .base_key()
317 .base_tag_index(KeyTag::Index as u8, &index);
318 batch.put_key_value_bytes(key, value);
319 delete_view = false;
320 }
321 }
322 self.stored_hash = None
323 } else {
324 for index in mem::take(&mut self.deletion_set.deleted_prefixes) {
325 let key = self
326 .context
327 .base_key()
328 .base_tag_index(KeyTag::Index as u8, &index);
329 batch.delete_key_prefix(key);
330 }
331 for (index, update) in mem::take(&mut self.updates) {
332 let key = self
333 .context
334 .base_key()
335 .base_tag_index(KeyTag::Index as u8, &index);
336 match update {
337 Update::Removed => batch.delete_key(key),
338 Update::Set(value) => batch.put_key_value_bytes(key, value),
339 }
340 }
341 }
342 self.sizes.flush(batch)?;
343 let hash = *self.hash.get_mut().unwrap();
344 if self.stored_hash != hash {
345 let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
346 match hash {
347 None => batch.delete_key(key),
348 Some(hash) => batch.put_key_value(key, &hash)?,
349 }
350 self.stored_hash = hash;
351 }
352 if self.stored_total_size != self.total_size {
353 let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
354 batch.put_key_value(key, &self.total_size)?;
355 self.stored_total_size = self.total_size;
356 }
357 self.deletion_set.delete_storage_first = false;
358 Ok(delete_view)
359 }
360
361 fn clear(&mut self) {
362 self.deletion_set.clear();
363 self.updates.clear();
364 self.total_size = SizeData::default();
365 self.sizes.clear();
366 *self.hash.get_mut().unwrap() = None;
367 }
368}
369
370impl<C: Context> ClonableView for KeyValueStoreView<C> {
371 fn clone_unchecked(&mut self) -> Self {
372 KeyValueStoreView {
373 context: self.context.clone(),
374 deletion_set: self.deletion_set.clone(),
375 updates: self.updates.clone(),
376 stored_total_size: self.stored_total_size,
377 total_size: self.total_size,
378 sizes: self.sizes.clone_unchecked(),
379 stored_hash: self.stored_hash,
380 hash: Mutex::new(*self.hash.get_mut().unwrap()),
381 }
382 }
383}
384
385impl<C: Context> KeyValueStoreView<C> {
386 fn max_key_size(&self) -> usize {
387 let prefix_len = self.context.base_key().bytes.len();
388 <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
389 }
390
391 pub fn total_size(&self) -> SizeData {
404 self.total_size
405 }
406
407 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
430 where
431 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
432 {
433 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
434 let mut updates = self.updates.iter();
435 let mut update = updates.next();
436 if !self.deletion_set.delete_storage_first {
437 let mut suffix_closed_set =
438 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
439 for index in self
440 .context
441 .store()
442 .find_keys_by_prefix(&key_prefix)
443 .await?
444 {
445 loop {
446 match update {
447 Some((key, value)) if key <= &index => {
448 if let Update::Set(_) = value {
449 if !f(key)? {
450 return Ok(());
451 }
452 }
453 update = updates.next();
454 if key == &index {
455 break;
456 }
457 }
458 _ => {
459 if !suffix_closed_set.find_key(&index) && !f(&index)? {
460 return Ok(());
461 }
462 break;
463 }
464 }
465 }
466 }
467 }
468 while let Some((key, value)) = update {
469 if let Update::Set(_) = value {
470 if !f(key)? {
471 return Ok(());
472 }
473 }
474 update = updates.next();
475 }
476 Ok(())
477 }
478
479 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
501 where
502 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
503 {
504 self.for_each_index_while(|key| {
505 f(key)?;
506 Ok(true)
507 })
508 .await
509 }
510
511 pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
533 where
534 F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
535 {
536 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
537 let mut updates = self.updates.iter();
538 let mut update = updates.next();
539 if !self.deletion_set.delete_storage_first {
540 let mut suffix_closed_set =
541 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
542 for entry in self
543 .context
544 .store()
545 .find_key_values_by_prefix(&key_prefix)
546 .await?
547 {
548 let (index, index_val) = entry;
549 loop {
550 match update {
551 Some((key, value)) if key <= &index => {
552 if let Update::Set(value) = value {
553 if !f(key, value)? {
554 return Ok(());
555 }
556 }
557 update = updates.next();
558 if key == &index {
559 break;
560 }
561 }
562 _ => {
563 if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
564 return Ok(());
565 }
566 break;
567 }
568 }
569 }
570 }
571 }
572 while let Some((key, value)) = update {
573 if let Update::Set(value) = value {
574 if !f(key, value)? {
575 return Ok(());
576 }
577 }
578 update = updates.next();
579 }
580 Ok(())
581 }
582
583 pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
604 where
605 F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
606 {
607 self.for_each_index_value_while(|key, value| {
608 f(key, value)?;
609 Ok(true)
610 })
611 .await
612 }
613
614 pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
629 let mut indices = Vec::new();
630 self.for_each_index(|index| {
631 indices.push(index.to_vec());
632 Ok(())
633 })
634 .await?;
635 Ok(indices)
636 }
637
638 pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
653 let mut index_values = Vec::new();
654 self.for_each_index_value(|index, value| {
655 index_values.push((index.to_vec(), value.to_vec()));
656 Ok(())
657 })
658 .await?;
659 Ok(index_values)
660 }
661
662 pub async fn count(&self) -> Result<usize, ViewError> {
677 let mut count = 0;
678 self.for_each_index(|_index| {
679 count += 1;
680 Ok(())
681 })
682 .await?;
683 Ok(count)
684 }
685
686 pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
700 #[cfg(with_metrics)]
701 let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
702 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
703 if let Some(update) = self.updates.get(index) {
704 let value = match update {
705 Update::Removed => None,
706 Update::Set(value) => Some(value.clone()),
707 };
708 return Ok(value);
709 }
710 if self.deletion_set.contains_prefix_of(index) {
711 return Ok(None);
712 }
713 let key = self
714 .context
715 .base_key()
716 .base_tag_index(KeyTag::Index as u8, index);
717 Ok(self.context.store().read_value_bytes(&key).await?)
718 }
719
720 pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
734 #[cfg(with_metrics)]
735 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
736 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
737 if let Some(update) = self.updates.get(index) {
738 let test = match update {
739 Update::Removed => false,
740 Update::Set(_value) => true,
741 };
742 return Ok(test);
743 }
744 if self.deletion_set.contains_prefix_of(index) {
745 return Ok(false);
746 }
747 let key = self
748 .context
749 .base_key()
750 .base_tag_index(KeyTag::Index as u8, index);
751 Ok(self.context.store().contains_key(&key).await?)
752 }
753
754 pub async fn contains_keys(&self, indices: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewError> {
769 #[cfg(with_metrics)]
770 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
771 let mut results = Vec::with_capacity(indices.len());
772 let mut missed_indices = Vec::new();
773 let mut vector_query = Vec::new();
774 for (i, index) in indices.into_iter().enumerate() {
775 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
776 if let Some(update) = self.updates.get(&index) {
777 let value = match update {
778 Update::Removed => false,
779 Update::Set(_) => true,
780 };
781 results.push(value);
782 } else {
783 results.push(false);
784 if !self.deletion_set.contains_prefix_of(&index) {
785 missed_indices.push(i);
786 let key = self
787 .context
788 .base_key()
789 .base_tag_index(KeyTag::Index as u8, &index);
790 vector_query.push(key);
791 }
792 }
793 }
794 let values = self.context.store().contains_keys(vector_query).await?;
795 for (i, value) in missed_indices.into_iter().zip(values) {
796 results[i] = value;
797 }
798 Ok(results)
799 }
800
801 pub async fn multi_get(
817 &self,
818 indices: Vec<Vec<u8>>,
819 ) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
820 #[cfg(with_metrics)]
821 let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
822 let mut result = Vec::with_capacity(indices.len());
823 let mut missed_indices = Vec::new();
824 let mut vector_query = Vec::new();
825 for (i, index) in indices.into_iter().enumerate() {
826 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
827 if let Some(update) = self.updates.get(&index) {
828 let value = match update {
829 Update::Removed => None,
830 Update::Set(value) => Some(value.clone()),
831 };
832 result.push(value);
833 } else {
834 result.push(None);
835 if !self.deletion_set.contains_prefix_of(&index) {
836 missed_indices.push(i);
837 let key = self
838 .context
839 .base_key()
840 .base_tag_index(KeyTag::Index as u8, &index);
841 vector_query.push(key);
842 }
843 }
844 }
845 let values = self
846 .context
847 .store()
848 .read_multi_values_bytes(vector_query)
849 .await?;
850 for (i, value) in missed_indices.into_iter().zip(values) {
851 result[i] = value;
852 }
853 Ok(result)
854 }
855
856 pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
875 #[cfg(with_metrics)]
876 let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
877 *self.hash.get_mut().unwrap() = None;
878 let max_key_size = self.max_key_size();
879 for operation in batch.operations {
880 match operation {
881 WriteOperation::Delete { key } => {
882 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
883 if let Some(value) = self.sizes.get(&key).await? {
884 let entry_size = SizeData {
885 key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
886 value,
887 };
888 self.total_size.sub_assign(entry_size);
889 }
890 self.sizes.remove(key.clone());
891 if self.deletion_set.contains_prefix_of(&key) {
892 self.updates.remove(&key);
894 } else {
895 self.updates.insert(key, Update::Removed);
896 }
897 }
898 WriteOperation::Put { key, value } => {
899 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
900 let entry_size = SizeData {
901 key: key.len() as u32,
902 value: value.len() as u32,
903 };
904 self.total_size.add_assign(entry_size)?;
905 if let Some(value) = self.sizes.get(&key).await? {
906 let entry_size = SizeData {
907 key: key.len() as u32,
908 value,
909 };
910 self.total_size.sub_assign(entry_size);
911 }
912 self.sizes.insert(key.clone(), entry_size.value);
913 self.updates.insert(key, Update::Set(value));
914 }
915 WriteOperation::DeletePrefix { key_prefix } => {
916 ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
917 let key_list = self
918 .updates
919 .range(get_interval(key_prefix.clone()))
920 .map(|x| x.0.to_vec())
921 .collect::<Vec<_>>();
922 for key in key_list {
923 self.updates.remove(&key);
924 }
925 let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
926 for (key, value) in key_values {
927 let entry_size = SizeData {
928 key: key.len() as u32,
929 value,
930 };
931 self.total_size.sub_assign(entry_size);
932 self.sizes.remove(key);
933 }
934 self.sizes.remove_by_prefix(key_prefix.clone());
935 self.deletion_set.insert_key_prefix(key_prefix);
936 }
937 }
938 }
939 Ok(())
940 }
941
942 pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
955 let mut batch = Batch::new();
956 batch.put_key_value_bytes(index, value);
957 self.write_batch(batch).await
958 }
959
960 pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
974 let mut batch = Batch::new();
975 batch.delete_key(index);
976 self.write_batch(batch).await
977 }
978
979 pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
993 let mut batch = Batch::new();
994 batch.delete_key_prefix(key_prefix);
995 self.write_batch(batch).await
996 }
997
998 pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
1013 #[cfg(with_metrics)]
1014 let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
1015 ensure!(
1016 key_prefix.len() <= self.max_key_size(),
1017 ViewError::KeyTooLong
1018 );
1019 let len = key_prefix.len();
1020 let key_prefix_full = self
1021 .context
1022 .base_key()
1023 .base_tag_index(KeyTag::Index as u8, key_prefix);
1024 let mut keys = Vec::new();
1025 let key_prefix_upper = get_upper_bound(key_prefix);
1026 let mut updates = self
1027 .updates
1028 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1029 let mut update = updates.next();
1030 if !self.deletion_set.delete_storage_first {
1031 let mut suffix_closed_set =
1032 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1033 for key in self
1034 .context
1035 .store()
1036 .find_keys_by_prefix(&key_prefix_full)
1037 .await?
1038 {
1039 loop {
1040 match update {
1041 Some((update_key, update_value))
1042 if &update_key[len..] <= key.as_slice() =>
1043 {
1044 if let Update::Set(_) = update_value {
1045 keys.push(update_key[len..].to_vec());
1046 }
1047 update = updates.next();
1048 if update_key[len..] == key[..] {
1049 break;
1050 }
1051 }
1052 _ => {
1053 let mut key_with_prefix = key_prefix.to_vec();
1054 key_with_prefix.extend_from_slice(&key);
1055 if !suffix_closed_set.find_key(&key_with_prefix) {
1056 keys.push(key);
1057 }
1058 break;
1059 }
1060 }
1061 }
1062 }
1063 }
1064 while let Some((update_key, update_value)) = update {
1065 if let Update::Set(_) = update_value {
1066 let update_key = update_key[len..].to_vec();
1067 keys.push(update_key);
1068 }
1069 update = updates.next();
1070 }
1071 Ok(keys)
1072 }
1073
1074 pub async fn find_key_values_by_prefix(
1090 &self,
1091 key_prefix: &[u8],
1092 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1093 #[cfg(with_metrics)]
1094 let _latency =
1095 metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1096 ensure!(
1097 key_prefix.len() <= self.max_key_size(),
1098 ViewError::KeyTooLong
1099 );
1100 let len = key_prefix.len();
1101 let key_prefix_full = self
1102 .context
1103 .base_key()
1104 .base_tag_index(KeyTag::Index as u8, key_prefix);
1105 let mut key_values = Vec::new();
1106 let key_prefix_upper = get_upper_bound(key_prefix);
1107 let mut updates = self
1108 .updates
1109 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1110 let mut update = updates.next();
1111 if !self.deletion_set.delete_storage_first {
1112 let mut suffix_closed_set =
1113 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1114 for entry in self
1115 .context
1116 .store()
1117 .find_key_values_by_prefix(&key_prefix_full)
1118 .await?
1119 {
1120 let (key, value) = entry;
1121 loop {
1122 match update {
1123 Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1124 if let Update::Set(update_value) = update_value {
1125 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1126 key_values.push(key_value);
1127 }
1128 update = updates.next();
1129 if update_key[len..] == key[..] {
1130 break;
1131 }
1132 }
1133 _ => {
1134 let mut key_with_prefix = key_prefix.to_vec();
1135 key_with_prefix.extend_from_slice(&key);
1136 if !suffix_closed_set.find_key(&key_with_prefix) {
1137 key_values.push((key, value));
1138 }
1139 break;
1140 }
1141 }
1142 }
1143 }
1144 }
1145 while let Some((update_key, update_value)) = update {
1146 if let Update::Set(update_value) = update_value {
1147 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1148 key_values.push(key_value);
1149 }
1150 update = updates.next();
1151 }
1152 Ok(key_values)
1153 }
1154
1155 async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1156 #[cfg(with_metrics)]
1157 let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1158 let mut hasher = sha3::Sha3_256::default();
1159 let mut count = 0u32;
1160 self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1161 count += 1;
1162 hasher.update_with_bytes(index)?;
1163 hasher.update_with_bytes(value)?;
1164 Ok(())
1165 })
1166 .await?;
1167 hasher.update_with_bcs_bytes(&count)?;
1168 Ok(hasher.finalize())
1169 }
1170}
1171
1172impl<C: Context> HashableView for KeyValueStoreView<C> {
1173 type Hasher = sha3::Sha3_256;
1174
1175 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1176 let hash = *self.hash.get_mut().unwrap();
1177 match hash {
1178 Some(hash) => Ok(hash),
1179 None => {
1180 let new_hash = self.compute_hash().await?;
1181 let hash = self.hash.get_mut().unwrap();
1182 *hash = Some(new_hash);
1183 Ok(new_hash)
1184 }
1185 }
1186 }
1187
1188 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1189 let hash = *self.hash.lock().unwrap();
1190 match hash {
1191 Some(hash) => Ok(hash),
1192 None => {
1193 let new_hash = self.compute_hash().await?;
1194 let mut hash = self.hash.lock().unwrap();
1195 *hash = Some(new_hash);
1196 Ok(new_hash)
1197 }
1198 }
1199 }
1200}
1201
1202#[cfg(with_testing)]
1204#[derive(Debug, Clone)]
1205pub struct ViewContainer<C> {
1206 view: Arc<RwLock<KeyValueStoreView<C>>>,
1207}
1208
1209#[cfg(with_testing)]
1210impl<C> WithError for ViewContainer<C> {
1211 type Error = ViewContainerError;
1212}
1213
1214#[cfg(with_testing)]
1215#[derive(Error, Debug)]
1217pub enum ViewContainerError {
1218 #[error(transparent)]
1220 ViewError(#[from] ViewError),
1221
1222 #[error(transparent)]
1224 BcsError(#[from] bcs::Error),
1225}
1226
1227#[cfg(with_testing)]
1228impl KeyValueStoreError for ViewContainerError {
1229 const BACKEND: &'static str = "view_container";
1230}
1231
1232#[cfg(with_testing)]
1233impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1234 const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1235
1236 fn max_stream_queries(&self) -> usize {
1237 1
1238 }
1239
1240 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1241 let view = self.view.read().await;
1242 Ok(view.get(key).await?)
1243 }
1244
1245 async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1246 let view = self.view.read().await;
1247 Ok(view.contains_key(key).await?)
1248 }
1249
1250 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ViewContainerError> {
1251 let view = self.view.read().await;
1252 Ok(view.contains_keys(keys).await?)
1253 }
1254
1255 async fn read_multi_values_bytes(
1256 &self,
1257 keys: Vec<Vec<u8>>,
1258 ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1259 let view = self.view.read().await;
1260 Ok(view.multi_get(keys).await?)
1261 }
1262
1263 async fn find_keys_by_prefix(
1264 &self,
1265 key_prefix: &[u8],
1266 ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1267 let view = self.view.read().await;
1268 Ok(view.find_keys_by_prefix(key_prefix).await?)
1269 }
1270
1271 async fn find_key_values_by_prefix(
1272 &self,
1273 key_prefix: &[u8],
1274 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1275 let view = self.view.read().await;
1276 Ok(view.find_key_values_by_prefix(key_prefix).await?)
1277 }
1278}
1279
1280#[cfg(with_testing)]
1281impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1282 const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1283
1284 async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1285 let mut view = self.view.write().await;
1286 view.write_batch(batch).await?;
1287 let mut batch = Batch::new();
1288 view.flush(&mut batch)?;
1289 view.context()
1290 .store()
1291 .write_batch(batch)
1292 .await
1293 .map_err(ViewError::from)?;
1294 Ok(())
1295 }
1296
1297 async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1298 Ok(())
1299 }
1300}
1301
1302#[cfg(with_testing)]
1303impl<C: Context> ViewContainer<C> {
1304 pub async fn new(context: C) -> Result<Self, ViewError> {
1306 let view = KeyValueStoreView::load(context).await?;
1307 let view = Arc::new(RwLock::new(view));
1308 Ok(Self { view })
1309 }
1310}