1use std::collections::{vec_deque::IterMut, VecDeque};
5
6use allocative::Allocative;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10
11use crate::{
12 batch::Batch,
13 common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
14 context::Context,
15 hashable_wrapper::WrappedHashableContainerView,
16 historical_hash_wrapper::HistoricallyHashableView,
17 store::ReadableKeyValueStore as _,
18 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
19};
20
21#[cfg(with_metrics)]
22mod metrics {
23 use std::sync::LazyLock;
24
25 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
26 use prometheus::HistogramVec;
27
28 pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
30 register_histogram_vec(
31 "bucket_queue_view_hash_runtime",
32 "BucketQueueView hash runtime",
33 &[],
34 exponential_bucket_latencies(5.0),
35 )
36 });
37}
38
39#[repr(u8)]
41enum KeyTag {
42 Front = MIN_VIEW_TAG,
44 Store,
46 Index,
48}
49
50#[derive(Clone, Debug, Default, Serialize, Deserialize)]
52struct BucketStore {
53 descriptions: Vec<BucketDescription>,
56 front_position: usize,
58}
59
60#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)]
62struct BucketDescription {
63 length: usize,
65 index: usize,
67}
68
69impl BucketStore {
70 fn len(&self) -> usize {
71 self.descriptions.len()
72 }
73}
74
75#[derive(Copy, Clone, Debug, Allocative)]
77struct Cursor {
78 offset: usize,
80 position: usize,
82}
83
84#[derive(Clone, Debug, Allocative)]
86enum State<T> {
87 Loaded { data: Vec<T> },
88 NotLoaded { length: usize },
89}
90
91impl<T> Bucket<T> {
92 fn len(&self) -> usize {
93 match &self.state {
94 State::Loaded { data } => data.len(),
95 State::NotLoaded { length } => *length,
96 }
97 }
98
99 fn is_loaded(&self) -> bool {
100 match self.state {
101 State::Loaded { .. } => true,
102 State::NotLoaded { .. } => false,
103 }
104 }
105
106 fn to_description(&self) -> BucketDescription {
107 BucketDescription {
108 length: self.len(),
109 index: self.index,
110 }
111 }
112}
113
114#[derive(Clone, Debug, Allocative)]
116struct Bucket<T> {
117 index: usize,
119 state: State<T>,
121}
122
123#[derive(Debug, Allocative)]
129#[allocative(bound = "C, T: Allocative, const N: usize")]
130pub struct BucketQueueView<C, T, const N: usize> {
131 #[allocative(skip)]
133 context: C,
134 stored_buckets: VecDeque<Bucket<T>>,
136 new_back_values: VecDeque<T>,
138 stored_front_position: usize,
140 cursor: Option<Cursor>,
143 delete_storage_first: bool,
145}
146
147impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
148where
149 C: Context,
150 T: Send + Sync + Clone + Serialize + DeserializeOwned,
151{
152 const NUM_INIT_KEYS: usize = 2;
153
154 type Context = C;
155
156 fn context(&self) -> C {
157 self.context.clone()
158 }
159
160 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
161 let key1 = context.base_key().base_tag(KeyTag::Front as u8);
162 let key2 = context.base_key().base_tag(KeyTag::Store as u8);
163 Ok(vec![key1, key2])
164 }
165
166 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
167 let value1 = values.first().ok_or(ViewError::PostLoadValuesError)?;
168 let value2 = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
169 let front = from_bytes_option::<Vec<T>>(value1)?;
170 let mut stored_buckets = VecDeque::from(match front {
171 Some(data) => {
172 let bucket = Bucket {
173 index: 0,
174 state: State::Loaded { data },
175 };
176 vec![bucket]
177 }
178 None => {
179 vec![]
180 }
181 });
182 let bucket_store = from_bytes_option_or_default::<BucketStore>(value2)?;
183 for i in 1..bucket_store.len() {
186 let length = bucket_store.descriptions[i].length;
187 let index = bucket_store.descriptions[i].index;
188 stored_buckets.push_back(Bucket {
189 index,
190 state: State::NotLoaded { length },
191 });
192 }
193 let cursor = if bucket_store.descriptions.is_empty() {
194 None
195 } else {
196 Some(Cursor {
197 offset: 0,
198 position: bucket_store.front_position,
199 })
200 };
201 Ok(Self {
202 context,
203 stored_buckets,
204 stored_front_position: bucket_store.front_position,
205 new_back_values: VecDeque::new(),
206 cursor,
207 delete_storage_first: false,
208 })
209 }
210
211 fn rollback(&mut self) {
212 self.delete_storage_first = false;
213 self.cursor = if self.stored_buckets.is_empty() {
214 None
215 } else {
216 Some(Cursor {
217 offset: 0,
218 position: self.stored_front_position,
219 })
220 };
221 self.new_back_values.clear();
222 }
223
224 async fn has_pending_changes(&self) -> bool {
225 if self.delete_storage_first {
226 return true;
227 }
228 if !self.stored_buckets.is_empty() {
229 let Some(cursor) = self.cursor else {
230 return true;
231 };
232 if cursor.offset != 0 || cursor.position != self.stored_front_position {
233 return true;
234 }
235 }
236 !self.new_back_values.is_empty()
237 }
238
239 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
240 let mut delete_view = false;
241 let mut descriptions = Vec::new();
242 let mut stored_front_position = self.stored_front_position;
243 if self.stored_count() == 0 {
244 let key_prefix = self.context.base_key().bytes.clone();
245 batch.delete_key_prefix(key_prefix);
246 delete_view = true;
247 stored_front_position = 0;
248 } else if let Some(cursor) = self.cursor {
249 for i in 0..cursor.offset {
251 let bucket = &self.stored_buckets[i];
252 let index = bucket.index;
253 let key = self.get_bucket_key(index)?;
254 batch.delete_key(key);
255 }
256 stored_front_position = cursor.position;
257 let first_index = self.stored_buckets[cursor.offset].index;
259 let start_offset = if first_index != 0 {
260 let key = self.get_bucket_key(first_index)?;
262 batch.delete_key(key);
263 let key = self.get_bucket_key(0)?;
264 let bucket = &self.stored_buckets[cursor.offset];
265 let State::Loaded { data } = &bucket.state else {
266 unreachable!("The front bucket is always loaded.");
267 };
268 batch.put_key_value(key, data)?;
269 descriptions.push(BucketDescription {
270 length: bucket.len(),
271 index: 0,
272 });
273 cursor.offset + 1
274 } else {
275 cursor.offset
276 };
277 for bucket in self.stored_buckets.range(start_offset..) {
278 descriptions.push(bucket.to_description());
279 }
280 }
281 if !self.new_back_values.is_empty() {
282 delete_view = false;
283 let mut index = if self.stored_count() == 0 {
287 0
288 } else if let Some(last_description) = descriptions.last() {
289 last_description.index + 1
290 } else {
291 0
293 };
294 let mut start = 0;
295 while start < self.new_back_values.len() {
296 let end = std::cmp::min(start + N, self.new_back_values.len());
297 let value_chunk: Vec<_> = self.new_back_values.range(start..end).collect();
298 let key = self.get_bucket_key(index)?;
299 batch.put_key_value(key, &value_chunk)?;
300 descriptions.push(BucketDescription {
301 index,
302 length: end - start,
303 });
304 index += 1;
305 start = end;
306 }
307 }
308 if !delete_view {
309 let bucket_store = BucketStore {
310 descriptions,
311 front_position: stored_front_position,
312 };
313 let key = self.context.base_key().base_tag(KeyTag::Store as u8);
314 batch.put_key_value(key, &bucket_store)?;
315 }
316 Ok(delete_view)
317 }
318
319 fn post_save(&mut self) {
320 if self.stored_count() == 0 {
321 self.stored_buckets.clear();
322 self.stored_front_position = 0;
323 self.cursor = None;
324 } else if let Some(cursor) = self.cursor {
325 for _ in 0..cursor.offset {
326 self.stored_buckets.pop_front();
327 }
328 self.cursor = Some(Cursor {
329 offset: 0,
330 position: cursor.position,
331 });
332 self.stored_front_position = cursor.position;
333 self.stored_buckets[0].index = 0;
335 }
336 if !self.new_back_values.is_empty() {
337 let mut index = match self.stored_buckets.back() {
338 Some(bucket) => bucket.index + 1,
339 None => 0,
340 };
341 let new_back_values = std::mem::take(&mut self.new_back_values);
342 let new_back_values = new_back_values.into_iter().collect::<Vec<_>>();
343 for value_chunk in new_back_values.chunks(N) {
344 self.stored_buckets.push_back(Bucket {
345 index,
346 state: State::Loaded {
347 data: value_chunk.to_vec(),
348 },
349 });
350 index += 1;
351 }
352 if self.cursor.is_none() {
353 self.cursor = Some(Cursor {
354 offset: 0,
355 position: 0,
356 });
357 }
358 }
359 self.delete_storage_first = false;
360 }
361
362 fn clear(&mut self) {
363 self.delete_storage_first = true;
364 self.new_back_values.clear();
365 self.cursor = None;
366 }
367}
368
369impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
370where
371 Self: View,
372{
373 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
374 Ok(BucketQueueView {
375 context: self.context.clone(),
376 stored_buckets: self.stored_buckets.clone(),
377 new_back_values: self.new_back_values.clone(),
378 stored_front_position: self.stored_front_position,
379 cursor: self.cursor,
380 delete_storage_first: self.delete_storage_first,
381 })
382 }
383}
384
385impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
386 fn get_bucket_key(&self, index: usize) -> Result<Vec<u8>, ViewError> {
388 Ok(if index == 0 {
389 self.context.base_key().base_tag(KeyTag::Front as u8)
390 } else {
391 self.context
392 .base_key()
393 .derive_tag_key(KeyTag::Index as u8, &index)?
394 })
395 }
396
397 pub fn stored_count(&self) -> usize {
410 if self.delete_storage_first {
411 0
412 } else {
413 let Some(cursor) = self.cursor else {
414 return 0;
415 };
416 let mut stored_count = 0;
417 for offset in cursor.offset..self.stored_buckets.len() {
418 stored_count += self.stored_buckets[offset].len();
419 }
420 stored_count -= cursor.position;
421 stored_count
422 }
423 }
424
425 pub fn count(&self) -> usize {
438 self.stored_count() + self.new_back_values.len()
439 }
440}
441
442impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
443 pub fn front(&self) -> Option<&T> {
457 match self.cursor {
458 Some(Cursor { offset, position }) => {
459 let bucket = &self.stored_buckets[offset];
460 let State::Loaded { data } = &bucket.state else {
461 unreachable!("The front bucket should always be loaded");
462 };
463 Some(&data[position])
464 }
465 None => self.new_back_values.front(),
466 }
467 }
468
469 pub fn front_mut(&mut self) -> Option<&mut T> {
485 match self.cursor {
486 Some(Cursor { offset, position }) => {
487 let bucket = self
488 .stored_buckets
489 .get_mut(offset)
490 .expect("cursor.offset must be a valid index into stored_buckets");
491 let State::Loaded { data } = &mut bucket.state else {
492 unreachable!("The front bucket should always be loaded");
493 };
494 Some(
495 data.get_mut(position)
496 .expect("cursor.position must be a valid index within the front bucket"),
497 )
498 }
499 None => self.new_back_values.front_mut(),
500 }
501 }
502
503 pub async fn delete_front(&mut self) -> Result<(), ViewError> {
517 match self.cursor {
518 Some(cursor) => {
519 let mut offset = cursor.offset;
520 let mut position = cursor.position + 1;
521 if self.stored_buckets[offset].len() == position {
522 offset += 1;
523 position = 0;
524 }
525 if offset == self.stored_buckets.len() {
526 self.cursor = None;
527 } else {
528 if !self.stored_buckets[offset].is_loaded() {
529 let index = self.stored_buckets[offset].index;
530 let key = self.get_bucket_key(index)?;
531 let data = self.context.store().read_value(&key).await?;
532 let data = match data {
533 Some(value) => value,
534 None => {
535 return Err(ViewError::MissingEntries(
536 "BucketQueueView::delete_front".into(),
537 ));
538 }
539 };
540 self.stored_buckets[offset].state = State::Loaded { data };
541 }
542 self.cursor = Some(Cursor { offset, position });
543 }
544 }
545 None => {
546 self.new_back_values.pop_front();
547 }
548 }
549 Ok(())
550 }
551
552 pub fn push_back(&mut self, value: T) {
565 self.new_back_values.push_back(value);
566 }
567
568 pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
582 let count = self.count();
583 self.read_context(self.cursor, count).await
584 }
585
586 pub async fn back(&mut self) -> Result<Option<T>, ViewError>
600 where
601 T: Clone,
602 {
603 if let Some(value) = self.new_back_values.back() {
604 return Ok(Some(value.clone()));
605 }
606 if self.cursor.is_none() {
607 return Ok(None);
608 }
609 let Some(bucket) = self.stored_buckets.back() else {
610 return Ok(None);
611 };
612 match &bucket.state {
613 State::Loaded { data } => Ok(Some(
614 data.last().expect("a stored bucket is never empty").clone(),
615 )),
616 State::NotLoaded { .. } => {
617 let key = self.get_bucket_key(bucket.index)?;
618 let data = self
619 .context
620 .store()
621 .read_value::<Vec<T>>(&key)
622 .await?
623 .ok_or_else(|| ViewError::MissingEntries("BucketQueueView::back".into()))?;
624 let result = data.last().expect("a stored bucket is never empty").clone();
625 self.stored_buckets
626 .back_mut()
627 .expect("stored_buckets is non-empty since we just accessed its back element")
628 .state = State::Loaded { data };
629 Ok(Some(result))
630 }
631 }
632 }
633
634 async fn read_context(
635 &self,
636 cursor: Option<Cursor>,
637 count: usize,
638 ) -> Result<Vec<T>, ViewError> {
639 if count == 0 {
640 return Ok(Vec::new());
641 }
642 let mut elements = Vec::<T>::new();
643 let mut count_remain = count;
644 if let Some(cursor) = cursor {
645 let mut keys = Vec::new();
646 let mut position = cursor.position;
647 for offset in cursor.offset..self.stored_buckets.len() {
648 let bucket = &self.stored_buckets[offset];
649 let size = bucket.len() - position;
650 if !bucket.is_loaded() {
651 let key = self.get_bucket_key(bucket.index)?;
652 keys.push(key);
653 };
654 if size >= count_remain {
655 break;
656 }
657 count_remain -= size;
658 position = 0;
659 }
660 let values = self.context.store().read_multi_values_bytes(&keys).await?;
661 let mut value_pos = 0;
662 count_remain = count;
663 let mut position = cursor.position;
664 for offset in cursor.offset..self.stored_buckets.len() {
665 let bucket = &self.stored_buckets[offset];
666 let size = bucket.len() - position;
667 let data = match &bucket.state {
668 State::Loaded { data } => data,
669 State::NotLoaded { .. } => {
670 let value = match &values[value_pos] {
671 Some(value) => value,
672 None => {
673 return Err(ViewError::MissingEntries(
674 "BucketQueueView::read_context".into(),
675 ));
676 }
677 };
678 value_pos += 1;
679 &bcs::from_bytes::<Vec<T>>(value)?
680 }
681 };
682 elements.extend(data[position..].iter().take(count_remain).cloned());
683 if size >= count_remain {
684 return Ok(elements);
685 }
686 count_remain -= size;
687 position = 0;
688 }
689 }
690 let count_read = std::cmp::min(count_remain, self.new_back_values.len());
691 elements.extend(self.new_back_values.range(0..count_read).cloned());
692 Ok(elements)
693 }
694
695 pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
710 let count = std::cmp::min(count, self.count());
711 self.read_context(self.cursor, count).await
712 }
713
714 pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
729 let count = std::cmp::min(count, self.count());
730 if count <= self.new_back_values.len() {
731 let start = self.new_back_values.len() - count;
732 Ok(self
733 .new_back_values
734 .range(start..)
735 .cloned()
736 .collect::<Vec<_>>())
737 } else {
738 let mut increment = self.count() - count;
739 let Some(cursor) = self.cursor else {
740 unreachable!("Cursor should be Some when stored_count > 0");
741 };
742 let mut position = cursor.position;
743 for offset in cursor.offset..self.stored_buckets.len() {
744 let size = self.stored_buckets[offset].len() - position;
745 if increment < size {
746 return self
747 .read_context(
748 Some(Cursor {
749 offset,
750 position: position + increment,
751 }),
752 count,
753 )
754 .await;
755 }
756 increment -= size;
757 position = 0;
758 }
759 unreachable!("BucketQueueView::read_back: iterated past all stored buckets without finding the requested position");
760 }
761 }
762
763 async fn load_all(&mut self) -> Result<(), ViewError> {
764 if !self.delete_storage_first {
765 let elements = self.elements().await?;
766 self.new_back_values.clear();
767 for elt in elements {
768 self.new_back_values.push_back(elt);
769 }
770 self.cursor = None;
771 self.delete_storage_first = true;
772 }
773 Ok(())
774 }
775
776 pub async fn try_iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
792 self.load_all().await?;
793 Ok(self.new_back_values.iter_mut())
794 }
795}
796
797impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
798 for BucketQueueView<C, T, N>
799where
800 Self: View,
801{
802 type Hasher = sha3::Sha3_256;
803
804 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
805 self.hash().await
806 }
807
808 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
809 #[cfg(with_metrics)]
810 let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
811 let elements = self.elements().await?;
812 let mut hasher = sha3::Sha3_256::default();
813 hasher.update_with_bcs_bytes(&elements)?;
814 Ok(hasher.finalize())
815 }
816}
817
818pub type HashedBucketQueueView<C, T, const N: usize> =
820 WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
821
822pub type HistoricallyHashedBucketQueueView<C, T, const N: usize> =
824 HistoricallyHashableView<C, BucketQueueView<C, T, N>>;
825
826#[cfg(with_graphql)]
827mod graphql {
828 use std::borrow::Cow;
829
830 use super::BucketQueueView;
831 use crate::{
832 context::Context,
833 graphql::{hash_name, mangle},
834 };
835
836 impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
837 for BucketQueueView<C, T, N>
838 {
839 fn type_name() -> Cow<'static, str> {
840 format!(
841 "BucketQueueView_{}_{:08x}",
842 mangle(T::type_name()),
843 hash_name::<T>()
844 )
845 .into()
846 }
847 }
848
849 #[async_graphql::Object(cache_control(no_cache), name_type)]
850 impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
851 where
852 C: Send + Sync,
853 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
854 {
855 #[graphql(derived(name = "count"))]
856 async fn count_(&self) -> Result<u32, async_graphql::Error> {
857 Ok(self.count() as u32)
858 }
859
860 async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
861 Ok(self
862 .read_front(count.unwrap_or_else(|| self.count()))
863 .await?)
864 }
865 }
866}
867
868#[cfg(test)]
869mod tests {
870 use super::*;
871 use crate::{
872 batch::Batch,
873 context::{Context, MemoryContext},
874 store::WritableKeyValueStore as _,
875 };
876
877 #[tokio::test]
883 async fn delete_front_load_failure_preserves_invariant() -> Result<(), ViewError> {
884 let context = MemoryContext::new_for_testing(());
885 let mut view = BucketQueueView::<_, u8, 2>::load(context.clone()).await?;
886 for value in [1u8, 2, 3, 4] {
887 view.push_back(value);
888 }
889 save(&context, &mut view).await?;
890
891 let mut view = BucketQueueView::<_, u8, 2>::load(context.clone()).await?;
892
893 let bucket1_key = view.get_bucket_key(1)?;
894 let mut batch = Batch::new();
895 batch.delete_key(bucket1_key);
896 context.store().write_batch(batch).await?;
897
898 view.delete_front().await?;
899 let err = view.delete_front().await.expect_err("load should fail");
900 assert!(matches!(err, ViewError::MissingEntries(_)));
901
902 save(&context, &mut view).await?;
903
904 Ok(())
905 }
906
907 async fn save<V: View>(context: &V::Context, view: &mut V) -> Result<(), ViewError> {
908 let mut batch = Batch::new();
909 view.pre_save(&mut batch)?;
910 context.store().write_batch(batch).await?;
911 view.post_save();
912 Ok(())
913 }
914}