1use std::collections::{vec_deque::IterMut, VecDeque};
5
6use allocative::Allocative;
7use linera_base::data_types::ArithmeticError;
8#[cfg(with_metrics)]
9use linera_base::prometheus_util::MeasureLatency as _;
10use serde::{de::DeserializeOwned, Deserialize, Serialize};
11
12use crate::{
13 batch::Batch,
14 common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
15 context::Context,
16 hashable_wrapper::WrappedHashableContainerView,
17 historical_hash_wrapper::HistoricallyHashableView,
18 store::ReadableKeyValueStore as _,
19 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
20};
21
22#[cfg(with_metrics)]
23mod metrics {
24 use std::sync::LazyLock;
25
26 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
27 use prometheus::HistogramVec;
28
29 pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
31 register_histogram_vec(
32 "bucket_queue_view_hash_runtime",
33 "BucketQueueView hash runtime",
34 &[],
35 exponential_bucket_latencies(5.0),
36 )
37 });
38}
39
40#[repr(u8)]
42enum KeyTag {
43 Layout = MIN_VIEW_TAG,
45 Front,
47 Middle,
49 Back,
51}
52
53#[derive(Clone, Debug, Default, Serialize, Deserialize)]
59struct BucketLayout {
60 front_position: u32,
62 num_buckets: u32,
64 first_index: u32,
67}
68
69#[derive(Copy, Clone, Debug, Allocative)]
71struct Cursor {
72 offset: usize,
75 position: usize,
77}
78
79#[derive(Debug, Allocative)]
91#[allocative(bound = "C, T: Allocative, const N: usize")]
92pub struct BucketQueueView<C, T, const N: usize> {
93 #[allocative(skip)]
95 context: C,
96 new_back_values: VecDeque<T>,
98 stored_first_index: u32,
100 stored_num_buckets: u32,
102 stored_front: Vec<T>,
109 stored_back: Vec<T>,
112 stored_front_position: u32,
114 cursor: Option<Cursor>,
121 current_middle: Option<Vec<T>>,
125 delete_storage_first: bool,
127}
128
129impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
130where
131 C: Context,
132 T: Send + Sync + Clone + Serialize + DeserializeOwned,
133{
134 const NUM_INIT_KEYS: usize = 3;
135
136 type Context = C;
137
138 fn context(&self) -> C {
139 self.context.clone()
140 }
141
142 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
143 let key1 = context.base_key().base_tag(KeyTag::Layout as u8);
144 let key2 = context.base_key().base_tag(KeyTag::Front as u8);
145 let key3 = context.base_key().base_tag(KeyTag::Back as u8);
146 Ok(vec![key1, key2, key3])
147 }
148
149 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
150 let value_layout = values.first().ok_or(ViewError::PostLoadValuesError)?;
151 let value_front = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
152 let value_back = values.get(2).ok_or(ViewError::PostLoadValuesError)?;
153 let front = from_bytes_option::<Vec<T>>(value_front)?;
154 let back = from_bytes_option::<Vec<T>>(value_back)?;
155 let layout = from_bytes_option_or_default::<BucketLayout>(value_layout)?;
156 let (stored_first_index, stored_num_buckets, stored_front, stored_back, cursor) =
159 match front {
160 Some(front) => {
161 let back = if layout.num_buckets >= 2 {
162 back.unwrap_or_default()
163 } else {
164 Vec::new()
165 };
166 let cursor = Cursor {
167 offset: 0,
168 position: layout.front_position as usize,
169 };
170 (
171 layout.first_index,
172 layout.num_buckets,
173 front,
174 back,
175 Some(cursor),
176 )
177 }
178 None => (0, 0, Vec::new(), Vec::new(), None),
179 };
180 Ok(Self {
181 context,
182 new_back_values: VecDeque::new(),
183 stored_first_index,
184 stored_num_buckets,
185 stored_front,
186 stored_back,
187 stored_front_position: layout.front_position,
188 cursor,
189 current_middle: None,
190 delete_storage_first: false,
191 })
192 }
193
194 fn rollback(&mut self) {
195 self.delete_storage_first = false;
200 self.cursor = (self.stored_num_buckets > 0).then_some(Cursor {
201 offset: 0,
202 position: self.stored_front_position as usize,
203 });
204 self.current_middle = None;
205 self.new_back_values.clear();
206 }
207
208 async fn has_pending_changes(&self) -> bool {
209 if self.delete_storage_first {
210 return true;
211 }
212 if self.stored_num_buckets > 0 {
213 let Some(cursor) = self.cursor else {
214 return true;
215 };
216 if cursor.offset != 0 || cursor.position != self.stored_front_position as usize {
217 return true;
218 }
219 }
220 !self.new_back_values.is_empty()
221 }
222
223 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
224 let plan = self.save_plan()?;
225 match plan.case {
226 SaveCase::Empty => {
227 if plan.has_storage {
228 batch.delete_key_prefix(self.context.base_key().bytes.clone());
229 }
230 Ok(true)
231 }
232 SaveCase::MetadataOnly => {
233 batch.put_key_value(
234 self.layout_key(),
235 &BucketLayout {
236 front_position: plan.cursor_position,
237 num_buckets: 1,
238 first_index: self.stored_first_index,
239 },
240 )?;
241 Ok(false)
242 }
243 SaveCase::Rewrite => {
244 if plan.has_storage {
245 batch.delete_key_prefix(self.context.base_key().bytes.clone());
246 }
247 let mut all_data = Vec::new();
248 if let Some(data) = self.current_front_data() {
249 all_data.extend(data[plan.cursor_position as usize..].iter().cloned());
250 }
251 all_data.extend(self.new_back_values.iter().cloned());
252 if all_data.is_empty() {
253 return Ok(true);
254 }
255 let first_index = 0;
256 let num_buckets = self.write_chunks(batch, &all_data, first_index)?;
257 batch.put_key_value(
258 self.layout_key(),
259 &BucketLayout {
260 front_position: 0,
261 num_buckets,
262 first_index,
263 },
264 )?;
265 Ok(false)
266 }
267 SaveCase::Patch {
268 new_first_index,
269 remaining_count,
270 } => {
271 for offset in 1..plan.remaining_offset {
276 let index = checked_bucket_index(self.stored_first_index, offset)?;
277 batch.delete_key(self.get_middle_key(index)?);
278 }
279 if plan.remaining_offset > 0 {
281 let data = self
282 .current_front_data()
283 .expect("Patch implies a live cursor within the stored buckets");
284 batch.put_key_value(self.front_key(), &data.to_vec())?;
285 batch.delete_key(self.get_middle_key(new_first_index)?);
286 }
287
288 let num_buckets = if self.new_back_values.is_empty() {
289 remaining_count
290 } else {
291 let mut merged = self.stored_back.clone();
293 merged.extend(self.new_back_values.iter().cloned());
294 let chunks = merged.chunks(N).collect::<Vec<_>>();
295 let num_new_chunks =
296 u32::try_from(chunks.len()).map_err(|_| ArithmeticError::Overflow)?;
297 let new_middle_start =
300 checked_bucket_index(new_first_index, remaining_count as usize - 1)?;
301 for (i, chunk) in chunks.iter().enumerate().take(chunks.len() - 1) {
302 let key =
303 self.get_middle_key(checked_bucket_index(new_middle_start, i)?)?;
304 batch.put_key_value(key, &chunk.to_vec())?;
305 }
306 batch.put_key_value(self.back_key(), &chunks.last().unwrap().to_vec())?;
307 (remaining_count - 1)
308 .checked_add(num_new_chunks)
309 .ok_or(ArithmeticError::Overflow)?
310 };
311
312 batch.put_key_value(
313 self.layout_key(),
314 &BucketLayout {
315 front_position: plan.cursor_position,
316 num_buckets,
317 first_index: new_first_index,
318 },
319 )?;
320 Ok(false)
321 }
322 }
323 }
324
325 fn post_save(&mut self) {
326 let plan = self.save_plan().expect("verified in pre_save");
327 self.delete_storage_first = false;
328 match plan.case {
329 SaveCase::Empty => {
330 self.stored_first_index = 0;
331 self.stored_num_buckets = 0;
332 self.stored_front = Vec::new();
333 self.stored_back = Vec::new();
334 self.cursor = None;
335 self.current_middle = None;
336 self.stored_front_position = 0;
337 }
338 SaveCase::MetadataOnly => {
339 self.cursor = Some(Cursor {
341 offset: 0,
342 position: plan.cursor_position as usize,
343 });
344 self.current_middle = None;
345 self.stored_front_position = plan.cursor_position;
346 }
347 SaveCase::Rewrite => {
348 let mut all_data = Vec::new();
349 if let Some(data) = self.current_front_data() {
350 all_data.extend(data[plan.cursor_position as usize..].iter().cloned());
351 }
352 all_data.extend(std::mem::take(&mut self.new_back_values));
353 let num_chunks = all_data.chunks(N).len();
356 self.stored_first_index = 0;
357 self.stored_num_buckets = u32::try_from(num_chunks).expect("verified in pre_save");
358 self.current_middle = None;
359 self.stored_front_position = 0;
360 if num_chunks == 0 {
361 self.stored_front = Vec::new();
362 self.stored_back = Vec::new();
363 self.cursor = None;
364 } else {
365 self.stored_back = if num_chunks >= 2 {
366 all_data[(num_chunks - 1) * N..].to_vec()
367 } else {
368 Vec::new()
369 };
370 all_data.truncate(N); self.stored_front = all_data;
372 self.cursor = Some(Cursor {
373 offset: 0,
374 position: 0,
375 });
376 }
377 }
378 SaveCase::Patch {
379 new_first_index,
380 remaining_count,
381 } => {
382 let cursor = self.cursor.expect("Patch implies a live cursor");
383 if plan.remaining_offset > 0 {
387 self.stored_front = self
388 .current_middle
389 .take()
390 .expect("the middle the cursor points into is loaded");
391 }
392 self.current_middle = None;
393 self.stored_first_index = new_first_index;
394 self.stored_num_buckets = if self.new_back_values.is_empty() {
395 remaining_count
396 } else {
397 let mut merged = std::mem::take(&mut self.stored_back);
400 merged.extend(std::mem::take(&mut self.new_back_values));
401 let num_new_chunks =
402 u32::try_from(merged.chunks(N).len()).expect("verified in pre_save");
403 let back_start = (num_new_chunks as usize - 1) * N;
404 self.stored_back = merged.split_off(back_start);
405 (remaining_count - 1) + num_new_chunks
406 };
407 self.cursor = Some(Cursor {
408 offset: 0,
409 position: cursor.position,
410 });
411 self.stored_front_position = plan.cursor_position;
412 }
413 }
414 }
415
416 fn clear(&mut self) {
417 self.delete_storage_first = true;
420 self.new_back_values.clear();
421 self.cursor = None;
422 self.current_middle = None;
423 }
424}
425
426impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
427where
428 Self: View,
429{
430 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
431 Ok(BucketQueueView {
432 context: self.context.clone(),
433 new_back_values: self.new_back_values.clone(),
434 stored_first_index: self.stored_first_index,
435 stored_num_buckets: self.stored_num_buckets,
436 stored_front: self.stored_front.clone(),
437 stored_back: self.stored_back.clone(),
438 stored_front_position: self.stored_front_position,
439 cursor: self.cursor,
440 current_middle: self.current_middle.clone(),
441 delete_storage_first: self.delete_storage_first,
442 })
443 }
444}
445
446#[derive(Debug)]
450enum SaveCase {
451 Empty,
453 MetadataOnly,
456 Rewrite,
459 Patch {
463 new_first_index: u32,
464 remaining_count: u32,
465 },
466}
467
468#[derive(Debug)]
469struct SavePlan {
470 case: SaveCase,
471 remaining_offset: usize,
474 cursor_position: u32,
478 has_storage: bool,
480}
481
482fn checked_bucket_index(base: u32, offset: usize) -> Result<u32, ArithmeticError> {
488 let offset = u32::try_from(offset).map_err(|_| ArithmeticError::Overflow)?;
489 base.checked_add(offset).ok_or(ArithmeticError::Overflow)
490}
491
492impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
493 fn front_key(&self) -> Vec<u8> {
494 self.context.base_key().base_tag(KeyTag::Front as u8)
495 }
496
497 fn back_key(&self) -> Vec<u8> {
498 self.context.base_key().base_tag(KeyTag::Back as u8)
499 }
500
501 fn layout_key(&self) -> Vec<u8> {
502 self.context.base_key().base_tag(KeyTag::Layout as u8)
503 }
504
505 fn get_middle_key(&self, index: u32) -> Result<Vec<u8>, ViewError> {
507 Ok(self
508 .context
509 .base_key()
510 .derive_tag_key(KeyTag::Middle as u8, &index)?)
511 }
512
513 fn current_front_data(&self) -> Option<&[T]> {
516 let cursor = self.cursor?;
517 let num_buckets = self.stored_num_buckets as usize;
518 Some(if cursor.offset == 0 {
519 &self.stored_front
520 } else if cursor.offset + 1 == num_buckets {
521 &self.stored_back
522 } else {
523 self.current_middle
524 .as_deref()
525 .expect("the middle bucket the cursor points into is loaded")
526 })
527 }
528
529 fn bucket_len(&self, offset: usize) -> usize {
532 if offset == 0 {
533 self.stored_front.len()
534 } else if offset + 1 == self.stored_num_buckets as usize {
535 self.stored_back.len()
536 } else {
537 N
538 }
539 }
540
541 fn save_plan(&self) -> Result<SavePlan, ViewError> {
545 let num_buckets = self.stored_num_buckets as usize;
546 let remaining_offset = if self.delete_storage_first {
547 num_buckets
548 } else {
549 self.cursor.map_or(num_buckets, |c| c.offset)
550 };
551 let remaining_count = num_buckets - remaining_offset;
552 let cursor_position = u32::try_from(self.cursor.map_or(0, |c| c.position))
553 .map_err(|_| ArithmeticError::Overflow)?;
554 let has_storage = self.stored_num_buckets > 0 || self.delete_storage_first;
555 let new_back_empty = self.new_back_values.is_empty();
556
557 let case = if remaining_count == 0 && new_back_empty {
558 SaveCase::Empty
559 } else if remaining_count == 1 && remaining_offset == 0 && new_back_empty {
560 SaveCase::MetadataOnly
561 } else if remaining_count <= 1 {
562 SaveCase::Rewrite
563 } else {
564 SaveCase::Patch {
565 new_first_index: checked_bucket_index(self.stored_first_index, remaining_offset)?,
566 remaining_count: u32::try_from(remaining_count)
567 .map_err(|_| ArithmeticError::Overflow)?,
568 }
569 };
570 Ok(SavePlan {
571 case,
572 remaining_offset,
573 cursor_position,
574 has_storage,
575 })
576 }
577
578 fn write_chunks(
583 &self,
584 batch: &mut Batch,
585 data: &[T],
586 first_index: u32,
587 ) -> Result<u32, ViewError>
588 where
589 T: Serialize + Clone,
590 {
591 let chunks = data.chunks(N).collect::<Vec<_>>();
592 let num_buckets = u32::try_from(chunks.len()).map_err(|_| ArithmeticError::Overflow)?;
593 batch.put_key_value(self.front_key(), &chunks[0].to_vec())?;
594 for (i, chunk) in chunks
595 .iter()
596 .enumerate()
597 .skip(1)
598 .take(chunks.len().saturating_sub(2))
599 {
600 let key = self.get_middle_key(checked_bucket_index(first_index, i)?)?;
601 batch.put_key_value(key, &chunk.to_vec())?;
602 }
603 if num_buckets >= 2 {
604 batch.put_key_value(self.back_key(), &chunks.last().unwrap().to_vec())?;
605 }
606 Ok(num_buckets)
607 }
608
609 fn stored_count(&self) -> usize {
611 if self.delete_storage_first {
612 return 0;
613 }
614 let Some(cursor) = self.cursor else {
615 return 0;
616 };
617 let remaining = self.stored_num_buckets as usize - cursor.offset;
618 let front_count = self.bucket_len(cursor.offset) - cursor.position;
620 if remaining == 1 {
621 return front_count;
622 }
623 let back_count = self.stored_back.len();
625 let num_middles = remaining - 2;
626 front_count + num_middles * N + back_count
627 }
628
629 pub fn count(&self) -> usize {
642 self.stored_count() + self.new_back_values.len()
643 }
644}
645
646impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
647 pub fn front(&self) -> Option<&T> {
661 match self.cursor {
662 Some(cursor) => {
663 let data = self
664 .current_front_data()
665 .expect("cursor is Some, so the current front is available");
666 Some(&data[cursor.position])
667 }
668 None => self.new_back_values.front(),
669 }
670 }
671
672 pub fn front_mut(&mut self) -> Option<&mut T> {
688 match self.cursor {
689 Some(cursor) => {
690 let num_buckets = self.stored_num_buckets as usize;
691 let data = if cursor.offset == 0 {
692 &mut self.stored_front
693 } else if cursor.offset + 1 == num_buckets {
694 &mut self.stored_back
695 } else {
696 self.current_middle
697 .as_mut()
698 .expect("the middle bucket the cursor points into is loaded")
699 };
700 Some(
701 data.get_mut(cursor.position)
702 .expect("cursor.position must be a valid position within the front bucket"),
703 )
704 }
705 None => self.new_back_values.front_mut(),
706 }
707 }
708
709 pub async fn delete_front(&mut self) -> Result<(), ViewError> {
723 let Some(cursor) = self.cursor else {
724 self.new_back_values.pop_front();
725 return Ok(());
726 };
727 let current_len = self
728 .current_front_data()
729 .expect("cursor points into the stored buckets")
730 .len();
731 let num_buckets = self.stored_num_buckets as usize;
732 let mut offset = cursor.offset;
733 let mut position = cursor.position + 1;
734 if position == current_len {
735 offset += 1;
736 position = 0;
737 if offset == num_buckets {
738 self.cursor = None;
740 self.current_middle = None;
741 return Ok(());
742 }
743 if offset + 1 == num_buckets {
747 self.current_middle = None;
749 } else {
750 let index = checked_bucket_index(self.stored_first_index, offset)?;
751 let key = self.get_middle_key(index)?;
752 let data = self
753 .context
754 .store()
755 .read_value(&key)
756 .await?
757 .ok_or_else(|| {
758 ViewError::MissingEntries("BucketQueueView::delete_front".into())
759 })?;
760 self.current_middle = Some(data);
761 }
762 }
763 self.cursor = Some(Cursor { offset, position });
764 Ok(())
765 }
766
767 pub fn push_back(&mut self, value: T) {
780 self.new_back_values.push_back(value);
781 }
782
783 pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
797 let count = self.count();
798 self.read_context(self.cursor, count).await
799 }
800
801 pub async fn back(&mut self) -> Result<Option<T>, ViewError>
815 where
816 T: Clone,
817 {
818 if let Some(value) = self.new_back_values.back() {
819 return Ok(Some(value.clone()));
820 }
821 if self.cursor.is_none() {
822 return Ok(None);
823 }
824 let last = if self.stored_num_buckets >= 2 {
827 self.stored_back.last()
828 } else {
829 self.stored_front.last()
830 };
831 Ok(last.cloned())
832 }
833
834 async fn read_context(
835 &self,
836 cursor: Option<Cursor>,
837 count: usize,
838 ) -> Result<Vec<T>, ViewError> {
839 if count == 0 {
840 return Ok(Vec::new());
841 }
842 let mut elements = Vec::<T>::new();
843 let mut count_remain = count;
844 if let Some(cursor) = cursor {
845 let num_buckets = self.stored_num_buckets as usize;
846 let mut keys = Vec::new();
848 let mut position = cursor.position;
849 let mut remain = count;
850 for offset in cursor.offset..num_buckets {
851 if offset != 0 && offset + 1 != num_buckets {
852 let index = checked_bucket_index(self.stored_first_index, offset)?;
853 keys.push(self.get_middle_key(index)?);
854 }
855 let size = self.bucket_len(offset) - position;
856 if size >= remain {
857 break;
858 }
859 remain -= size;
860 position = 0;
861 }
862 let values = self.context.store().read_multi_values_bytes(&keys).await?;
863 let mut value_pos = 0;
865 let mut position = cursor.position;
866 for offset in cursor.offset..num_buckets {
867 let read_buf;
868 let data: &[T] = if offset == 0 {
869 &self.stored_front
870 } else if offset + 1 == num_buckets {
871 &self.stored_back
872 } else {
873 let value = values[value_pos].as_ref().ok_or_else(|| {
874 ViewError::MissingEntries("BucketQueueView::read_context".into())
875 })?;
876 value_pos += 1;
877 read_buf = bcs::from_bytes::<Vec<T>>(value)?;
878 &read_buf
879 };
880 let size = data.len() - position;
881 elements.extend(data[position..].iter().take(count_remain).cloned());
882 if size >= count_remain {
883 return Ok(elements);
884 }
885 count_remain -= size;
886 position = 0;
887 }
888 }
889 let count_read = std::cmp::min(count_remain, self.new_back_values.len());
890 elements.extend(self.new_back_values.range(0..count_read).cloned());
891 Ok(elements)
892 }
893
894 pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
909 let count = std::cmp::min(count, self.count());
910 self.read_context(self.cursor, count).await
911 }
912
913 pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
928 let count = std::cmp::min(count, self.count());
929 if count <= self.new_back_values.len() {
930 let start = self.new_back_values.len() - count;
931 Ok(self
932 .new_back_values
933 .range(start..)
934 .cloned()
935 .collect::<Vec<_>>())
936 } else {
937 let mut increment = self.count() - count;
938 let Some(cursor) = self.cursor else {
939 unreachable!("Cursor should be Some when stored_count > 0");
940 };
941 let num_buckets = self.stored_num_buckets as usize;
942 let mut position = cursor.position;
943 for offset in cursor.offset..num_buckets {
944 let size = self.bucket_len(offset) - position;
945 if increment < size {
946 return self
947 .read_context(
948 Some(Cursor {
949 offset,
950 position: position + increment,
951 }),
952 count,
953 )
954 .await;
955 }
956 increment -= size;
957 position = 0;
958 }
959 unreachable!("BucketQueueView::read_back: iterated past all stored buckets without finding the requested position");
960 }
961 }
962
963 async fn load_all(&mut self) -> Result<(), ViewError> {
964 if !self.delete_storage_first {
965 let elements = self.elements().await?;
966 self.new_back_values.clear();
967 for elt in elements {
968 self.new_back_values.push_back(elt);
969 }
970 self.cursor = None;
971 self.current_middle = None;
972 self.delete_storage_first = true;
973 }
974 Ok(())
975 }
976
977 pub async fn try_iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
993 self.load_all().await?;
994 Ok(self.new_back_values.iter_mut())
995 }
996}
997
998impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
999 for BucketQueueView<C, T, N>
1000where
1001 Self: View,
1002{
1003 type Hasher = sha3::Sha3_256;
1004
1005 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1006 self.hash().await
1007 }
1008
1009 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1010 #[cfg(with_metrics)]
1011 let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
1012 let elements = self.elements().await?;
1013 let mut hasher = sha3::Sha3_256::default();
1014 hasher.update_with_bcs_bytes(&elements)?;
1015 Ok(hasher.finalize())
1016 }
1017}
1018
1019pub type HashedBucketQueueView<C, T, const N: usize> =
1021 WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
1022
1023pub type HistoricallyHashedBucketQueueView<C, T, const N: usize> =
1025 HistoricallyHashableView<C, BucketQueueView<C, T, N>>;
1026
1027#[cfg(with_graphql)]
1028mod graphql {
1029 use std::borrow::Cow;
1030
1031 use linera_base::data_types::ArithmeticError;
1032
1033 use super::BucketQueueView;
1034 use crate::{
1035 context::Context,
1036 graphql::{hash_name, mangle},
1037 };
1038
1039 impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
1040 for BucketQueueView<C, T, N>
1041 {
1042 fn type_name() -> Cow<'static, str> {
1043 format!(
1044 "BucketQueueView_{}_{:08x}",
1045 mangle(T::type_name()),
1046 hash_name::<T>()
1047 )
1048 .into()
1049 }
1050 }
1051
1052 #[async_graphql::Object(cache_control(no_cache), name_type)]
1053 impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
1054 where
1055 C: Send + Sync,
1056 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
1057 {
1058 #[graphql(derived(name = "count"))]
1059 async fn count_(&self) -> Result<u32, async_graphql::Error> {
1060 Ok(u32::try_from(self.count()).map_err(|_| ArithmeticError::Overflow)?)
1061 }
1062
1063 async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
1064 Ok(self
1065 .read_front(count.unwrap_or_else(|| self.count()))
1066 .await?)
1067 }
1068 }
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073 use super::*;
1074 use crate::{
1075 batch::Batch,
1076 context::{Context, MemoryContext},
1077 store::WritableKeyValueStore as _,
1078 };
1079
1080 #[tokio::test]
1085 async fn delete_front_load_failure_preserves_invariant() -> Result<(), ViewError> {
1086 let context = MemoryContext::new_for_testing(());
1087 let mut view = BucketQueueView::<_, u8, 2>::load(context.clone()).await?;
1088 for value in [1u8, 2, 3, 4, 5, 6] {
1090 view.push_back(value);
1091 }
1092 save(&context, &mut view).await?;
1093
1094 let mut view = BucketQueueView::<_, u8, 2>::load(context.clone()).await?;
1095
1096 let middle_key = view.get_middle_key(1)?;
1098 let mut batch = Batch::new();
1099 batch.delete_key(middle_key);
1100 context.store().write_batch(batch).await?;
1101
1102 view.delete_front().await?;
1103 let err = view.delete_front().await.expect_err("load should fail");
1104 assert!(matches!(err, ViewError::MissingEntries(_)));
1105
1106 save(&context, &mut view).await?;
1107
1108 Ok(())
1109 }
1110
1111 #[tokio::test]
1116 async fn save_load_roundtrip_across_sizes() -> Result<(), ViewError> {
1117 const N: usize = 3;
1118 for size in [0usize, 1, 2, N, N + 1, 2 * N, 2 * N + 1, 5 * N, 5 * N - 1] {
1119 let context = MemoryContext::new_for_testing(());
1120 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1121 for i in 0..u32::try_from(size).unwrap() {
1122 view.push_back(i);
1123 }
1124 save(&context, &mut view).await?;
1125
1126 let reloaded = BucketQueueView::<_, u32, N>::load(context).await?;
1127 let elements = reloaded.elements().await?;
1128 let expected = (0..u32::try_from(size).unwrap()).collect::<Vec<_>>();
1129 assert_eq!(elements, expected, "size = {size}");
1130 assert_eq!(reloaded.count(), size, "count for size = {size}");
1131 }
1132 Ok(())
1133 }
1134
1135 #[tokio::test]
1139 async fn middle_buckets_are_always_full() -> Result<(), ViewError> {
1140 const N: usize = 4;
1141 let context = MemoryContext::new_for_testing(());
1142 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1143 for i in 0..u32::try_from(5 * N + 2).unwrap() {
1145 view.push_back(i);
1146 }
1147 save(&context, &mut view).await?;
1148
1149 view.push_back(1000);
1151 view.push_back(1001);
1152 save(&context, &mut view).await?;
1153
1154 for _ in 0..(N - 1) {
1156 view.delete_front().await?;
1157 }
1158 save(&context, &mut view).await?;
1159
1160 let view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1162 let first_index = view.stored_first_index;
1163 for offset in 1..view.stored_num_buckets.saturating_sub(1) {
1164 let key = view.get_middle_key(first_index + offset)?;
1165 let data = context
1166 .store()
1167 .read_value::<Vec<u32>>(&key)
1168 .await?
1169 .expect("middle bucket should be present in storage");
1170 assert_eq!(
1171 data.len(),
1172 N,
1173 "middle at offset {offset} should hold N elements"
1174 );
1175 }
1176 Ok(())
1177 }
1178
1179 #[tokio::test]
1182 async fn stored_count_is_exact() -> Result<(), ViewError> {
1183 const N: usize = 3;
1184 let context = MemoryContext::new_for_testing(());
1185 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1186 for i in 0..7u32 {
1188 view.push_back(i);
1189 }
1190 save(&context, &mut view).await?;
1191
1192 let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1193 assert_eq!(view.stored_count(), 7);
1194 view.delete_front().await?; assert_eq!(view.stored_count(), 6);
1196 view.delete_front().await?; assert_eq!(view.stored_count(), 5);
1198 view.delete_front().await?; assert_eq!(view.stored_count(), 4);
1200 Ok(())
1201 }
1202
1203 #[tokio::test]
1208 async fn save_plan_covers_each_case() -> Result<(), ViewError> {
1209 const N: usize = 3;
1210
1211 let context = MemoryContext::new_for_testing(());
1213 let view = BucketQueueView::<_, u32, N>::load(context).await?;
1214 assert!(matches!(view.save_plan()?.case, SaveCase::Empty));
1215
1216 let context = MemoryContext::new_for_testing(());
1218 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1219 view.push_back(10);
1220 view.push_back(20);
1221 save(&context, &mut view).await?;
1222 let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1223 view.delete_front().await?;
1224 assert!(matches!(view.save_plan()?.case, SaveCase::MetadataOnly));
1225
1226 let context = MemoryContext::new_for_testing(());
1229 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1230 for i in 0..5u32 {
1231 view.push_back(i);
1232 }
1233 save(&context, &mut view).await?;
1234 let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1235 for _ in 0..N {
1236 view.delete_front().await?;
1237 }
1238 assert!(matches!(view.save_plan()?.case, SaveCase::Rewrite));
1239
1240 let context = MemoryContext::new_for_testing(());
1243 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1244 for i in 0..7u32 {
1245 view.push_back(i);
1246 }
1247 save(&context, &mut view).await?;
1248 let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1249 view.push_back(100);
1250 assert!(matches!(view.save_plan()?.case, SaveCase::Patch { .. }));
1251
1252 Ok(())
1253 }
1254
1255 #[tokio::test]
1262 async fn patch_promotes_front_keeps_middles_and_rechunks() -> Result<(), ViewError> {
1263 const N: usize = 3;
1264 let context = MemoryContext::new_for_testing(());
1265 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1266 for i in 0..14u32 {
1268 view.push_back(i);
1269 }
1270 save(&context, &mut view).await?;
1271
1272 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1273 for _ in 0..4 {
1276 view.delete_front().await?;
1277 }
1278 for v in 100..106u32 {
1281 view.push_back(v);
1282 }
1283
1284 let plan = view.save_plan()?;
1287 assert_eq!(plan.remaining_offset, 1);
1288 assert!(matches!(
1289 plan.case,
1290 SaveCase::Patch {
1291 remaining_count: 4,
1292 ..
1293 }
1294 ));
1295
1296 save(&context, &mut view).await?;
1297
1298 let view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1300 let expected: Vec<u32> = (4..14).chain(100..106).collect();
1301 assert_eq!(view.elements().await?, expected);
1302 assert_eq!(view.count(), expected.len());
1303
1304 assert_eq!(view.stored_num_buckets, 6);
1307 let first_index = view.stored_first_index;
1308 for offset in 1..view.stored_num_buckets - 1 {
1309 let key = view.get_middle_key(first_index + offset)?;
1310 let data = context
1311 .store()
1312 .read_value::<Vec<u32>>(&key)
1313 .await?
1314 .expect("middle bucket should be present in storage");
1315 assert_eq!(data.len(), N, "middle at offset {offset} must hold N");
1316 }
1317 Ok(())
1318 }
1319
1320 #[tokio::test]
1324 async fn n_equals_one_roundtrip() -> Result<(), ViewError> {
1325 let context = MemoryContext::new_for_testing(());
1326 let mut view = BucketQueueView::<_, u32, 1>::load(context.clone()).await?;
1327 for i in 0..5u32 {
1328 view.push_back(i);
1329 }
1330 save(&context, &mut view).await?;
1331
1332 let mut view = BucketQueueView::<_, u32, 1>::load(context.clone()).await?;
1333 assert_eq!(view.elements().await?, vec![0, 1, 2, 3, 4]);
1334 view.delete_front().await?;
1335 view.delete_front().await?;
1336 view.push_back(99);
1337 save(&context, &mut view).await?;
1338
1339 let view = BucketQueueView::<_, u32, 1>::load(context).await?;
1340 assert_eq!(view.elements().await?, vec![2, 3, 4, 99]);
1341 Ok(())
1342 }
1343
1344 #[tokio::test]
1347 async fn rollback_restores_saved_state() -> Result<(), ViewError> {
1348 const N: usize = 3;
1349 let context = MemoryContext::new_for_testing(());
1350 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1351 for i in 0..5u32 {
1352 view.push_back(i);
1353 }
1354 save(&context, &mut view).await?;
1355
1356 let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1357 view.delete_front().await?;
1358 view.delete_front().await?;
1359 view.push_back(100);
1360 view.push_back(101);
1361 assert_eq!(view.elements().await?, vec![2, 3, 4, 100, 101]);
1362
1363 view.rollback();
1364 assert_eq!(view.elements().await?, vec![0, 1, 2, 3, 4]);
1365
1366 assert!(!view.has_pending_changes().await);
1368 Ok(())
1369 }
1370
1371 #[tokio::test]
1376 async fn rollback_after_clear_restores_front_position() -> Result<(), ViewError> {
1377 const N: usize = 5;
1378 let context = MemoryContext::new_for_testing(());
1379 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1380 for value in [10u32, 20, 30] {
1381 view.push_back(value);
1382 }
1383 save(&context, &mut view).await?;
1384
1385 let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1387 view.delete_front().await?; save(&context, &mut view).await?;
1389
1390 let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1391 assert_eq!(view.elements().await?, vec![20, 30]);
1392
1393 view.clear();
1394 assert_eq!(view.elements().await?, Vec::<u32>::new());
1395
1396 view.rollback();
1397 assert_eq!(view.elements().await?, vec![20, 30]);
1398 assert!(!view.has_pending_changes().await);
1399 Ok(())
1400 }
1401
1402 async fn save<V: View>(context: &V::Context, view: &mut V) -> Result<(), ViewError> {
1403 let mut batch = Batch::new();
1404 view.pre_save(&mut batch)?;
1405 context.store().write_batch(batch).await?;
1406 view.post_save();
1407 Ok(())
1408 }
1409}