Skip to main content

linera_views/views/
bucket_queue_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{vec_deque::IterMut, VecDeque};
5
6use allocative::Allocative;
7use linera_base::data_types::ArithmeticError;
8#[cfg(with_metrics)]
9use linera_base::prometheus_util::MeasureLatency as _;
10use serde::{de::DeserializeOwned, Deserialize, Serialize};
11
12use crate::{
13    batch::Batch,
14    common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
15    context::Context,
16    hashable_wrapper::WrappedHashableContainerView,
17    historical_hash_wrapper::HistoricallyHashableView,
18    store::ReadableKeyValueStore as _,
19    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
20};
21
22#[cfg(with_metrics)]
23mod metrics {
24    use std::sync::LazyLock;
25
26    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
27    use prometheus::HistogramVec;
28
29    /// The runtime of hash computation
30    pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
31        register_histogram_vec(
32            "bucket_queue_view_hash_runtime",
33            "BucketQueueView hash runtime",
34            &[],
35            exponential_bucket_latencies(5.0),
36        )
37    });
38}
39
40/// Key tags to create the sub-keys of a [`BucketQueueView`] on top of the base key.
41#[repr(u8)]
42enum KeyTag {
43    /// Key tag for the `BucketLayout` metadata.
44    Layout = MIN_VIEW_TAG,
45    /// Key tag for the front bucket.
46    Front,
47    /// Key tag for the content of middle buckets.
48    Middle,
49    /// Key tag for the back bucket.
50    Back,
51}
52
53/// The metadata of the view in storage.
54///
55/// This is O(1) in size regardless of the number of buckets. The invariant is that
56/// all middle buckets (i.e. neither front nor back) have exactly N elements. Only
57/// the front bucket (tracked by `front_position`) and back bucket can be partial.
58#[derive(Clone, Debug, Default, Serialize, Deserialize)]
59struct BucketLayout {
60    /// The position of the front value in the front bucket.
61    front_position: u32,
62    /// The total number of stored buckets.
63    num_buckets: u32,
64    /// The logical index of the front bucket. Middle bucket at position `p` (1-indexed
65    /// from front) has storage key `KeyTag::Middle + (first_index + p)`.
66    first_index: u32,
67}
68
69/// The position of the queue's front value within the stored buckets.
70#[derive(Copy, Clone, Debug, Allocative)]
71struct Cursor {
72    /// The offset of the current front bucket from the front bucket of the saved
73    /// layout (`0` = the saved `stored_front` bucket).
74    offset: usize,
75    /// The position of the value within that bucket.
76    position: usize,
77}
78
79/// A view that supports a FIFO queue for values of type `T`.
80/// The size `N` has to be chosen by taking into account the size of the type `T`
81/// and the basic size of a block. For example a total size of 100 bytes to 10 KB
82/// seems adequate.
83///
84/// Only the endpoints of the stored sequence are materialized: `stored_front` and
85/// `stored_back` are kept in memory, while the middle buckets live solely in storage
86/// (each holds exactly `N` elements, so they are tracked by `stored_num_buckets` alone
87/// and read back lazily). The sole exception is `current_middle`, which holds the bucket
88/// the cursor currently points into once `delete_front` has advanced past the front.
89//#[allocative(bound = "T: Allocative")]
90#[derive(Debug, Allocative)]
91#[allocative(bound = "C, T: Allocative, const N: usize")]
92pub struct BucketQueueView<C, T, const N: usize> {
93    /// The view context.
94    #[allocative(skip)]
95    context: C,
96    /// The newly inserted back values, not yet persisted.
97    new_back_values: VecDeque<T>,
98    /// Storage index of the front bucket (the bucket at cursor offset `0`).
99    stored_first_index: u32,
100    /// The total number of stored buckets (front + middles + back).
101    stored_num_buckets: u32,
102    /// The front bucket's data, fully materialized; empty iff `stored_num_buckets == 0`.
103    ///
104    /// This is the saved front bucket and the anchor that `rollback` restores to: it is
105    /// never mutated by `delete_front`, so `rollback` (which is synchronous and reads
106    /// neither storage nor the live cursor) can rebuild the cursor from it together with
107    /// `stored_front_position`. In-memory mutations such as `clear` must leave it intact.
108    stored_front: Vec<T>,
109    /// The back bucket's data, fully materialized; empty when `stored_num_buckets < 2`
110    /// (a single stored bucket is represented by `stored_front` alone).
111    stored_back: Vec<T>,
112    /// Position of the front value within `stored_front`, as of the last save.
113    stored_front_position: u32,
114    /// The live cursor over the stored buckets, or `None` when no stored value remains
115    /// (so the front, if any, is the first of `new_back_values`).
116    ///
117    /// `None` does not imply `stored_num_buckets == 0`: after enough `delete_front`s the
118    /// cursor walks off the end and becomes `None` while the saved layout stays untouched
119    /// until the next save.
120    cursor: Option<Cursor>,
121    /// The data of the middle bucket the cursor currently points into, materialized when
122    /// `delete_front` has advanced strictly inside the middles (`0 < cursor.offset <
123    /// stored_num_buckets - 1`). `None` otherwise.
124    current_middle: Option<Vec<T>>,
125    /// Whether the storage is to be deleted or not.
126    delete_storage_first: bool,
127}
128
129impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
130where
131    C: Context,
132    T: Send + Sync + Clone + Serialize + DeserializeOwned,
133{
134    const NUM_INIT_KEYS: usize = 3;
135
136    type Context = C;
137
138    fn context(&self) -> C {
139        self.context.clone()
140    }
141
142    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
143        let key1 = context.base_key().base_tag(KeyTag::Layout as u8);
144        let key2 = context.base_key().base_tag(KeyTag::Front as u8);
145        let key3 = context.base_key().base_tag(KeyTag::Back as u8);
146        Ok(vec![key1, key2, key3])
147    }
148
149    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
150        let value_layout = values.first().ok_or(ViewError::PostLoadValuesError)?;
151        let value_front = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
152        let value_back = values.get(2).ok_or(ViewError::PostLoadValuesError)?;
153        let front = from_bytes_option::<Vec<T>>(value_front)?;
154        let back = from_bytes_option::<Vec<T>>(value_back)?;
155        let layout = from_bytes_option_or_default::<BucketLayout>(value_layout)?;
156        // The front bucket is present iff there is stored data. The middle buckets are
157        // read back lazily and the back bucket is materialized only when `num_buckets >= 2`.
158        let (stored_first_index, stored_num_buckets, stored_front, stored_back, cursor) =
159            match front {
160                Some(front) => {
161                    let back = if layout.num_buckets >= 2 {
162                        back.unwrap_or_default()
163                    } else {
164                        Vec::new()
165                    };
166                    let cursor = Cursor {
167                        offset: 0,
168                        position: layout.front_position as usize,
169                    };
170                    (
171                        layout.first_index,
172                        layout.num_buckets,
173                        front,
174                        back,
175                        Some(cursor),
176                    )
177                }
178                None => (0, 0, Vec::new(), Vec::new(), None),
179            };
180        Ok(Self {
181            context,
182            new_back_values: VecDeque::new(),
183            stored_first_index,
184            stored_num_buckets,
185            stored_front,
186            stored_back,
187            stored_front_position: layout.front_position,
188            cursor,
189            current_middle: None,
190            delete_storage_first: false,
191        })
192    }
193
194    fn rollback(&mut self) {
195        // The saved layout (`stored_front`, `stored_back`, `stored_first_index`,
196        // `stored_num_buckets`, `stored_front_position`) is never mutated outside of a
197        // save, so restoring the last-saved state only requires resetting the live cursor
198        // and dropping the pending back values.
199        self.delete_storage_first = false;
200        self.cursor = (self.stored_num_buckets > 0).then_some(Cursor {
201            offset: 0,
202            position: self.stored_front_position as usize,
203        });
204        self.current_middle = None;
205        self.new_back_values.clear();
206    }
207
208    async fn has_pending_changes(&self) -> bool {
209        if self.delete_storage_first {
210            return true;
211        }
212        if self.stored_num_buckets > 0 {
213            let Some(cursor) = self.cursor else {
214                return true;
215            };
216            if cursor.offset != 0 || cursor.position != self.stored_front_position as usize {
217                return true;
218            }
219        }
220        !self.new_back_values.is_empty()
221    }
222
223    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
224        let plan = self.save_plan()?;
225        match plan.case {
226            SaveCase::Empty => {
227                if plan.has_storage {
228                    batch.delete_key_prefix(self.context.base_key().bytes.clone());
229                }
230                Ok(true)
231            }
232            SaveCase::MetadataOnly => {
233                batch.put_key_value(
234                    self.layout_key(),
235                    &BucketLayout {
236                        front_position: plan.cursor_position,
237                        num_buckets: 1,
238                        first_index: self.stored_first_index,
239                    },
240                )?;
241                Ok(false)
242            }
243            SaveCase::Rewrite => {
244                if plan.has_storage {
245                    batch.delete_key_prefix(self.context.base_key().bytes.clone());
246                }
247                let mut all_data = Vec::new();
248                if let Some(data) = self.current_front_data() {
249                    all_data.extend(data[plan.cursor_position as usize..].iter().cloned());
250                }
251                all_data.extend(self.new_back_values.iter().cloned());
252                if all_data.is_empty() {
253                    return Ok(true);
254                }
255                let first_index = 0;
256                let num_buckets = self.write_chunks(batch, &all_data, first_index)?;
257                batch.put_key_value(
258                    self.layout_key(),
259                    &BucketLayout {
260                        front_position: 0,
261                        num_buckets,
262                        first_index,
263                    },
264                )?;
265                Ok(false)
266            }
267            SaveCase::Patch {
268                new_first_index,
269                remaining_count,
270            } => {
271                // Delete the keys of the consumed middle buckets (relative offsets
272                // 1..remaining_offset). Offset 0 was the old front (its KeyTag::Front is
273                // overwritten below if the front moved). The back is preserved since
274                // remaining_count >= 2.
275                for offset in 1..plan.remaining_offset {
276                    let index = checked_bucket_index(self.stored_first_index, offset)?;
277                    batch.delete_key(self.get_middle_key(index)?);
278                }
279                // Promote the new front bucket if the cursor crossed buckets.
280                if plan.remaining_offset > 0 {
281                    let data = self
282                        .current_front_data()
283                        .expect("Patch implies a live cursor within the stored buckets");
284                    batch.put_key_value(self.front_key(), &data.to_vec())?;
285                    batch.delete_key(self.get_middle_key(new_first_index)?);
286                }
287
288                let num_buckets = if self.new_back_values.is_empty() {
289                    remaining_count
290                } else {
291                    // Merge old back + new values, re-chunk into full-N middles + new back.
292                    let mut merged = self.stored_back.clone();
293                    merged.extend(self.new_back_values.iter().cloned());
294                    let chunks = merged.chunks(N).collect::<Vec<_>>();
295                    let num_new_chunks =
296                        u32::try_from(chunks.len()).map_err(|_| ArithmeticError::Overflow)?;
297                    // The old back becomes the first re-chunked bucket; it sits just past the
298                    // surviving middles (`remaining_count - 1` buckets after the new front).
299                    let new_middle_start =
300                        checked_bucket_index(new_first_index, remaining_count as usize - 1)?;
301                    for (i, chunk) in chunks.iter().enumerate().take(chunks.len() - 1) {
302                        let key =
303                            self.get_middle_key(checked_bucket_index(new_middle_start, i)?)?;
304                        batch.put_key_value(key, &chunk.to_vec())?;
305                    }
306                    batch.put_key_value(self.back_key(), &chunks.last().unwrap().to_vec())?;
307                    (remaining_count - 1)
308                        .checked_add(num_new_chunks)
309                        .ok_or(ArithmeticError::Overflow)?
310                };
311
312                batch.put_key_value(
313                    self.layout_key(),
314                    &BucketLayout {
315                        front_position: plan.cursor_position,
316                        num_buckets,
317                        first_index: new_first_index,
318                    },
319                )?;
320                Ok(false)
321            }
322        }
323    }
324
325    fn post_save(&mut self) {
326        let plan = self.save_plan().expect("verified in pre_save");
327        self.delete_storage_first = false;
328        match plan.case {
329            SaveCase::Empty => {
330                self.stored_first_index = 0;
331                self.stored_num_buckets = 0;
332                self.stored_front = Vec::new();
333                self.stored_back = Vec::new();
334                self.cursor = None;
335                self.current_middle = None;
336                self.stored_front_position = 0;
337            }
338            SaveCase::MetadataOnly => {
339                // The single front bucket survives unchanged; only the cursor advanced.
340                self.cursor = Some(Cursor {
341                    offset: 0,
342                    position: plan.cursor_position as usize,
343                });
344                self.current_middle = None;
345                self.stored_front_position = plan.cursor_position;
346            }
347            SaveCase::Rewrite => {
348                let mut all_data = Vec::new();
349                if let Some(data) = self.current_front_data() {
350                    all_data.extend(data[plan.cursor_position as usize..].iter().cloned());
351                }
352                all_data.extend(std::mem::take(&mut self.new_back_values));
353                // Mirror the `post_load` shape: only the front and back are materialized;
354                // the middles were written by `pre_save` and are read back lazily.
355                let num_chunks = all_data.chunks(N).len();
356                self.stored_first_index = 0;
357                self.stored_num_buckets = u32::try_from(num_chunks).expect("verified in pre_save");
358                self.current_middle = None;
359                self.stored_front_position = 0;
360                if num_chunks == 0 {
361                    self.stored_front = Vec::new();
362                    self.stored_back = Vec::new();
363                    self.cursor = None;
364                } else {
365                    self.stored_back = if num_chunks >= 2 {
366                        all_data[(num_chunks - 1) * N..].to_vec()
367                    } else {
368                        Vec::new()
369                    };
370                    all_data.truncate(N); // keep only the front chunk
371                    self.stored_front = all_data;
372                    self.cursor = Some(Cursor {
373                        offset: 0,
374                        position: 0,
375                    });
376                }
377            }
378            SaveCase::Patch {
379                new_first_index,
380                remaining_count,
381            } => {
382                let cursor = self.cursor.expect("Patch implies a live cursor");
383                // Promote the current front bucket to the new saved front. With >= 2
384                // buckets surviving the cursor is never on the back, so a moved front is
385                // always the materialized `current_middle`.
386                if plan.remaining_offset > 0 {
387                    self.stored_front = self
388                        .current_middle
389                        .take()
390                        .expect("the middle the cursor points into is loaded");
391                }
392                self.current_middle = None;
393                self.stored_first_index = new_first_index;
394                self.stored_num_buckets = if self.new_back_values.is_empty() {
395                    remaining_count
396                } else {
397                    // Merge old back + new values; the last chunk is the new back and the
398                    // earlier chunks are new middles (storage-only).
399                    let mut merged = std::mem::take(&mut self.stored_back);
400                    merged.extend(std::mem::take(&mut self.new_back_values));
401                    let num_new_chunks =
402                        u32::try_from(merged.chunks(N).len()).expect("verified in pre_save");
403                    let back_start = (num_new_chunks as usize - 1) * N;
404                    self.stored_back = merged.split_off(back_start);
405                    (remaining_count - 1) + num_new_chunks
406                };
407                self.cursor = Some(Cursor {
408                    offset: 0,
409                    position: cursor.position,
410                });
411                self.stored_front_position = plan.cursor_position;
412            }
413        }
414    }
415
416    fn clear(&mut self) {
417        // Leaves the saved layout in place (the `rollback` anchor); the next save sees
418        // `delete_storage_first` and wipes storage regardless.
419        self.delete_storage_first = true;
420        self.new_back_values.clear();
421        self.cursor = None;
422        self.current_middle = None;
423    }
424}
425
426impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
427where
428    Self: View,
429{
430    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
431        Ok(BucketQueueView {
432            context: self.context.clone(),
433            new_back_values: self.new_back_values.clone(),
434            stored_first_index: self.stored_first_index,
435            stored_num_buckets: self.stored_num_buckets,
436            stored_front: self.stored_front.clone(),
437            stored_back: self.stored_back.clone(),
438            stored_front_position: self.stored_front_position,
439            cursor: self.cursor,
440            current_middle: self.current_middle.clone(),
441            delete_storage_first: self.delete_storage_first,
442        })
443    }
444}
445
446/// Pattern describing how a save will affect storage and in-memory state.
447/// Derived from `&self` in [`BucketQueueView::save_plan`] and consumed by both
448/// `pre_save` (storage batch) and `post_save` (in-memory state).
449#[derive(Debug)]
450enum SaveCase {
451    /// Drop everything — leaves the queue empty.
452    Empty,
453    /// The single stored bucket survives without structural change; only the
454    /// cursor advanced inside the front bucket. Just refresh `front_position`.
455    MetadataOnly,
456    /// At most one stored bucket survives (after consumption) and there may be
457    /// new values to append. Drop all keys and rewrite from scratch.
458    Rewrite,
459    /// Two or more stored buckets survive. Delete consumed middles, promote a
460    /// surviving middle to front if the cursor crossed buckets, and — if there
461    /// are new values — merge them into the back and re-chunk.
462    Patch {
463        new_first_index: u32,
464        remaining_count: u32,
465    },
466}
467
468#[derive(Debug)]
469struct SavePlan {
470    case: SaveCase,
471    /// Relative bucket offset of the first surviving bucket (`cursor.offset`, or
472    /// `stored_num_buckets` when everything is dropped).
473    remaining_offset: usize,
474    /// Position of the cursor within the front bucket, validated as `u32` (the type
475    /// stored in `BucketLayout`). Cast to `usize` at the few sites that index into a
476    /// bucket's data.
477    cursor_position: u32,
478    /// True if there is any existing storage to clear (for `Empty`/`Rewrite`).
479    has_storage: bool,
480}
481
482/// Adds a relative `offset` to a base bucket index with checked arithmetic.
483///
484/// Bucket indices are `u32` (the width stored in `BucketLayout`); a queue would need
485/// billions of bucket rotations to overflow, but the additions that walk a base index
486/// across bucket offsets are guarded here to stay consistent with the rest of the view.
487fn checked_bucket_index(base: u32, offset: usize) -> Result<u32, ArithmeticError> {
488    let offset = u32::try_from(offset).map_err(|_| ArithmeticError::Overflow)?;
489    base.checked_add(offset).ok_or(ArithmeticError::Overflow)
490}
491
492impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
493    fn front_key(&self) -> Vec<u8> {
494        self.context.base_key().base_tag(KeyTag::Front as u8)
495    }
496
497    fn back_key(&self) -> Vec<u8> {
498        self.context.base_key().base_tag(KeyTag::Back as u8)
499    }
500
501    fn layout_key(&self) -> Vec<u8> {
502        self.context.base_key().base_tag(KeyTag::Layout as u8)
503    }
504
505    /// Gets the key for a middle bucket with the given storage index.
506    fn get_middle_key(&self, index: u32) -> Result<Vec<u8>, ViewError> {
507        Ok(self
508            .context
509            .base_key()
510            .derive_tag_key(KeyTag::Middle as u8, &index)?)
511    }
512
513    /// The data of the bucket the live cursor points into (the queue front within the
514    /// stored buckets), or `None` when the stored portion is exhausted.
515    fn current_front_data(&self) -> Option<&[T]> {
516        let cursor = self.cursor?;
517        let num_buckets = self.stored_num_buckets as usize;
518        Some(if cursor.offset == 0 {
519            &self.stored_front
520        } else if cursor.offset + 1 == num_buckets {
521            &self.stored_back
522        } else {
523            self.current_middle
524                .as_deref()
525                .expect("the middle bucket the cursor points into is loaded")
526        })
527    }
528
529    /// The number of elements in the stored bucket at relative `offset`. Middle buckets
530    /// always hold exactly `N` (invariant), so they need not be materialized.
531    fn bucket_len(&self, offset: usize) -> usize {
532        if offset == 0 {
533            self.stored_front.len()
534        } else if offset + 1 == self.stored_num_buckets as usize {
535            self.stored_back.len()
536        } else {
537            N
538        }
539    }
540
541    /// Classifies the pending save based on the current view state.
542    /// Called once by `pre_save` and once by `post_save`; since `&self` is
543    /// unchanged between the two it returns the same plan both times.
544    fn save_plan(&self) -> Result<SavePlan, ViewError> {
545        let num_buckets = self.stored_num_buckets as usize;
546        let remaining_offset = if self.delete_storage_first {
547            num_buckets
548        } else {
549            self.cursor.map_or(num_buckets, |c| c.offset)
550        };
551        let remaining_count = num_buckets - remaining_offset;
552        let cursor_position = u32::try_from(self.cursor.map_or(0, |c| c.position))
553            .map_err(|_| ArithmeticError::Overflow)?;
554        let has_storage = self.stored_num_buckets > 0 || self.delete_storage_first;
555        let new_back_empty = self.new_back_values.is_empty();
556
557        let case = if remaining_count == 0 && new_back_empty {
558            SaveCase::Empty
559        } else if remaining_count == 1 && remaining_offset == 0 && new_back_empty {
560            SaveCase::MetadataOnly
561        } else if remaining_count <= 1 {
562            SaveCase::Rewrite
563        } else {
564            SaveCase::Patch {
565                new_first_index: checked_bucket_index(self.stored_first_index, remaining_offset)?,
566                remaining_count: u32::try_from(remaining_count)
567                    .map_err(|_| ArithmeticError::Overflow)?,
568            }
569        };
570        Ok(SavePlan {
571            case,
572            remaining_offset,
573            cursor_position,
574            has_storage,
575        })
576    }
577
578    /// Splits `data` into N-sized chunks and writes them as front (KeyTag::Front),
579    /// middles (KeyTag::Middle, starting at `first_index`+1), and back (KeyTag::Back).
580    /// Returns the total number of buckets written. The caller is responsible for
581    /// writing the matching `BucketLayout` entry. Used by `Rewrite`.
582    fn write_chunks(
583        &self,
584        batch: &mut Batch,
585        data: &[T],
586        first_index: u32,
587    ) -> Result<u32, ViewError>
588    where
589        T: Serialize + Clone,
590    {
591        let chunks = data.chunks(N).collect::<Vec<_>>();
592        let num_buckets = u32::try_from(chunks.len()).map_err(|_| ArithmeticError::Overflow)?;
593        batch.put_key_value(self.front_key(), &chunks[0].to_vec())?;
594        for (i, chunk) in chunks
595            .iter()
596            .enumerate()
597            .skip(1)
598            .take(chunks.len().saturating_sub(2))
599        {
600            let key = self.get_middle_key(checked_bucket_index(first_index, i)?)?;
601            batch.put_key_value(key, &chunk.to_vec())?;
602        }
603        if num_buckets >= 2 {
604            batch.put_key_value(self.back_key(), &chunks.last().unwrap().to_vec())?;
605        }
606        Ok(num_buckets)
607    }
608
609    /// Gets the number of entries that are in the container and in storage.
610    fn stored_count(&self) -> usize {
611        if self.delete_storage_first {
612            return 0;
613        }
614        let Some(cursor) = self.cursor else {
615            return 0;
616        };
617        let remaining = self.stored_num_buckets as usize - cursor.offset;
618        // Current front bucket: count the valid elements after the cursor position.
619        let front_count = self.bucket_len(cursor.offset) - cursor.position;
620        if remaining == 1 {
621            return front_count;
622        }
623        // Back bucket plus the full middles between the cursor and the back.
624        let back_count = self.stored_back.len();
625        let num_middles = remaining - 2;
626        front_count + num_middles * N + back_count
627    }
628
629    /// The total number of entries of the container.
630    /// ```rust
631    /// # tokio_test::block_on(async {
632    /// # use linera_views::context::MemoryContext;
633    /// # use linera_views::bucket_queue_view::BucketQueueView;
634    /// # use crate::linera_views::views::View;
635    /// # let context = MemoryContext::new_for_testing(());
636    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
637    /// queue.push_back(34);
638    /// assert_eq!(queue.count(), 1);
639    /// # })
640    /// ```
641    pub fn count(&self) -> usize {
642        self.stored_count() + self.new_back_values.len()
643    }
644}
645
646impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
647    /// Gets a reference on the front value if any.
648    /// ```rust
649    /// # tokio_test::block_on(async {
650    /// # use linera_views::context::MemoryContext;
651    /// # use linera_views::bucket_queue_view::BucketQueueView;
652    /// # use crate::linera_views::views::View;
653    /// # let context = MemoryContext::new_for_testing(());
654    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
655    /// queue.push_back(34);
656    /// queue.push_back(42);
657    /// assert_eq!(queue.front().cloned(), Some(34));
658    /// # })
659    /// ```
660    pub fn front(&self) -> Option<&T> {
661        match self.cursor {
662            Some(cursor) => {
663                let data = self
664                    .current_front_data()
665                    .expect("cursor is Some, so the current front is available");
666                Some(&data[cursor.position])
667            }
668            None => self.new_back_values.front(),
669        }
670    }
671
672    /// Reads the front value, if any.
673    /// ```rust
674    /// # tokio_test::block_on(async {
675    /// # use linera_views::context::MemoryContext;
676    /// # use linera_views::bucket_queue_view::BucketQueueView;
677    /// # use crate::linera_views::views::View;
678    /// # let context = MemoryContext::new_for_testing(());
679    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
680    /// queue.push_back(34);
681    /// queue.push_back(42);
682    /// let front = queue.front_mut().unwrap();
683    /// *front = 43;
684    /// assert_eq!(queue.front().cloned(), Some(43));
685    /// # })
686    /// ```
687    pub fn front_mut(&mut self) -> Option<&mut T> {
688        match self.cursor {
689            Some(cursor) => {
690                let num_buckets = self.stored_num_buckets as usize;
691                let data = if cursor.offset == 0 {
692                    &mut self.stored_front
693                } else if cursor.offset + 1 == num_buckets {
694                    &mut self.stored_back
695                } else {
696                    self.current_middle
697                        .as_mut()
698                        .expect("the middle bucket the cursor points into is loaded")
699                };
700                Some(
701                    data.get_mut(cursor.position)
702                        .expect("cursor.position must be a valid position within the front bucket"),
703                )
704            }
705            None => self.new_back_values.front_mut(),
706        }
707    }
708
709    /// Deletes the front value, if any.
710    /// ```rust
711    /// # tokio_test::block_on(async {
712    /// # use linera_views::context::MemoryContext;
713    /// # use linera_views::bucket_queue_view::BucketQueueView;
714    /// # use crate::linera_views::views::View;
715    /// # let context = MemoryContext::new_for_testing(());
716    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
717    /// queue.push_back(34 as u128);
718    /// queue.delete_front().await.unwrap();
719    /// assert_eq!(queue.elements().await.unwrap(), Vec::<u128>::new());
720    /// # })
721    /// ```
722    pub async fn delete_front(&mut self) -> Result<(), ViewError> {
723        let Some(cursor) = self.cursor else {
724            self.new_back_values.pop_front();
725            return Ok(());
726        };
727        let current_len = self
728            .current_front_data()
729            .expect("cursor points into the stored buckets")
730            .len();
731        let num_buckets = self.stored_num_buckets as usize;
732        let mut offset = cursor.offset;
733        let mut position = cursor.position + 1;
734        if position == current_len {
735            offset += 1;
736            position = 0;
737            if offset == num_buckets {
738                // The stored portion is now exhausted.
739                self.cursor = None;
740                self.current_middle = None;
741                return Ok(());
742            }
743            // The cursor crossed into the bucket at `offset` (>= 1). Materialize it as the
744            // new front *before* moving the cursor, so a failed load leaves the view's
745            // invariant intact (the current front bucket is always materialized).
746            if offset + 1 == num_buckets {
747                // The back bucket is already materialized.
748                self.current_middle = None;
749            } else {
750                let index = checked_bucket_index(self.stored_first_index, offset)?;
751                let key = self.get_middle_key(index)?;
752                let data = self
753                    .context
754                    .store()
755                    .read_value(&key)
756                    .await?
757                    .ok_or_else(|| {
758                        ViewError::MissingEntries("BucketQueueView::delete_front".into())
759                    })?;
760                self.current_middle = Some(data);
761            }
762        }
763        self.cursor = Some(Cursor { offset, position });
764        Ok(())
765    }
766
767    /// Pushes a value to the end of the queue.
768    /// ```rust
769    /// # tokio_test::block_on(async {
770    /// # use linera_views::context::MemoryContext;
771    /// # use linera_views::bucket_queue_view::BucketQueueView;
772    /// # use crate::linera_views::views::View;
773    /// # let context = MemoryContext::new_for_testing(());
774    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
775    /// queue.push_back(34);
776    /// assert_eq!(queue.elements().await.unwrap(), vec![34]);
777    /// # })
778    /// ```
779    pub fn push_back(&mut self, value: T) {
780        self.new_back_values.push_back(value);
781    }
782
783    /// Returns the list of elements in the queue.
784    /// ```rust
785    /// # tokio_test::block_on(async {
786    /// # use linera_views::context::MemoryContext;
787    /// # use linera_views::bucket_queue_view::BucketQueueView;
788    /// # use crate::linera_views::views::View;
789    /// # let context = MemoryContext::new_for_testing(());
790    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
791    /// queue.push_back(34);
792    /// queue.push_back(37);
793    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
794    /// # })
795    /// ```
796    pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
797        let count = self.count();
798        self.read_context(self.cursor, count).await
799    }
800
801    /// Returns the last element of a bucket queue view
802    /// ```rust
803    /// # tokio_test::block_on(async {
804    /// # use linera_views::context::MemoryContext;
805    /// # use linera_views::bucket_queue_view::BucketQueueView;
806    /// # use crate::linera_views::views::View;
807    /// # let context = MemoryContext::new_for_testing(());
808    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
809    /// queue.push_back(34);
810    /// queue.push_back(37);
811    /// assert_eq!(queue.back().await.unwrap(), Some(37));
812    /// # })
813    /// ```
814    pub async fn back(&mut self) -> Result<Option<T>, ViewError>
815    where
816        T: Clone,
817    {
818        if let Some(value) = self.new_back_values.back() {
819            return Ok(Some(value.clone()));
820        }
821        if self.cursor.is_none() {
822            return Ok(None);
823        }
824        // The last stored element is the back bucket's last (or the front's, for a
825        // single stored bucket).
826        let last = if self.stored_num_buckets >= 2 {
827            self.stored_back.last()
828        } else {
829            self.stored_front.last()
830        };
831        Ok(last.cloned())
832    }
833
834    async fn read_context(
835        &self,
836        cursor: Option<Cursor>,
837        count: usize,
838    ) -> Result<Vec<T>, ViewError> {
839        if count == 0 {
840            return Ok(Vec::new());
841        }
842        let mut elements = Vec::<T>::new();
843        let mut count_remain = count;
844        if let Some(cursor) = cursor {
845            let num_buckets = self.stored_num_buckets as usize;
846            // First pass: gather the storage keys of the middle buckets we will read.
847            let mut keys = Vec::new();
848            let mut position = cursor.position;
849            let mut remain = count;
850            for offset in cursor.offset..num_buckets {
851                if offset != 0 && offset + 1 != num_buckets {
852                    let index = checked_bucket_index(self.stored_first_index, offset)?;
853                    keys.push(self.get_middle_key(index)?);
854                }
855                let size = self.bucket_len(offset) - position;
856                if size >= remain {
857                    break;
858                }
859                remain -= size;
860                position = 0;
861            }
862            let values = self.context.store().read_multi_values_bytes(&keys).await?;
863            // Second pass: assemble the elements, reading middles from `values`.
864            let mut value_pos = 0;
865            let mut position = cursor.position;
866            for offset in cursor.offset..num_buckets {
867                let read_buf;
868                let data: &[T] = if offset == 0 {
869                    &self.stored_front
870                } else if offset + 1 == num_buckets {
871                    &self.stored_back
872                } else {
873                    let value = values[value_pos].as_ref().ok_or_else(|| {
874                        ViewError::MissingEntries("BucketQueueView::read_context".into())
875                    })?;
876                    value_pos += 1;
877                    read_buf = bcs::from_bytes::<Vec<T>>(value)?;
878                    &read_buf
879                };
880                let size = data.len() - position;
881                elements.extend(data[position..].iter().take(count_remain).cloned());
882                if size >= count_remain {
883                    return Ok(elements);
884                }
885                count_remain -= size;
886                position = 0;
887            }
888        }
889        let count_read = std::cmp::min(count_remain, self.new_back_values.len());
890        elements.extend(self.new_back_values.range(0..count_read).cloned());
891        Ok(elements)
892    }
893
894    /// Returns the first elements of a bucket queue view
895    /// ```rust
896    /// # tokio_test::block_on(async {
897    /// # use linera_views::context::MemoryContext;
898    /// # use linera_views::bucket_queue_view::BucketQueueView;
899    /// # use crate::linera_views::views::View;
900    /// # let context = MemoryContext::new_for_testing(());
901    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
902    /// queue.push_back(34);
903    /// queue.push_back(37);
904    /// queue.push_back(47);
905    /// assert_eq!(queue.read_front(2).await.unwrap(), vec![34, 37]);
906    /// # })
907    /// ```
908    pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
909        let count = std::cmp::min(count, self.count());
910        self.read_context(self.cursor, count).await
911    }
912
913    /// Returns the last element of a bucket queue view
914    /// ```rust
915    /// # tokio_test::block_on(async {
916    /// # use linera_views::context::MemoryContext;
917    /// # use linera_views::bucket_queue_view::BucketQueueView;
918    /// # use crate::linera_views::views::View;
919    /// # let context = MemoryContext::new_for_testing(());
920    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
921    /// queue.push_back(34);
922    /// queue.push_back(37);
923    /// queue.push_back(47);
924    /// assert_eq!(queue.read_back(2).await.unwrap(), vec![37, 47]);
925    /// # })
926    /// ```
927    pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
928        let count = std::cmp::min(count, self.count());
929        if count <= self.new_back_values.len() {
930            let start = self.new_back_values.len() - count;
931            Ok(self
932                .new_back_values
933                .range(start..)
934                .cloned()
935                .collect::<Vec<_>>())
936        } else {
937            let mut increment = self.count() - count;
938            let Some(cursor) = self.cursor else {
939                unreachable!("Cursor should be Some when stored_count > 0");
940            };
941            let num_buckets = self.stored_num_buckets as usize;
942            let mut position = cursor.position;
943            for offset in cursor.offset..num_buckets {
944                let size = self.bucket_len(offset) - position;
945                if increment < size {
946                    return self
947                        .read_context(
948                            Some(Cursor {
949                                offset,
950                                position: position + increment,
951                            }),
952                            count,
953                        )
954                        .await;
955                }
956                increment -= size;
957                position = 0;
958            }
959            unreachable!("BucketQueueView::read_back: iterated past all stored buckets without finding the requested position");
960        }
961    }
962
963    async fn load_all(&mut self) -> Result<(), ViewError> {
964        if !self.delete_storage_first {
965            let elements = self.elements().await?;
966            self.new_back_values.clear();
967            for elt in elements {
968                self.new_back_values.push_back(elt);
969            }
970            self.cursor = None;
971            self.current_middle = None;
972            self.delete_storage_first = true;
973        }
974        Ok(())
975    }
976
977    /// Gets a mutable iterator on the entries of the queue
978    /// ```rust
979    /// # tokio_test::block_on(async {
980    /// # use linera_views::context::MemoryContext;
981    /// # use linera_views::bucket_queue_view::BucketQueueView;
982    /// # use linera_views::views::View;
983    /// # let context = MemoryContext::new_for_testing(());
984    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
985    /// queue.push_back(34);
986    /// let mut iter = queue.try_iter_mut().await.unwrap();
987    /// let value = iter.next().unwrap();
988    /// *value = 42;
989    /// assert_eq!(queue.elements().await.unwrap(), vec![42]);
990    /// # })
991    /// ```
992    pub async fn try_iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
993        self.load_all().await?;
994        Ok(self.new_back_values.iter_mut())
995    }
996}
997
998impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
999    for BucketQueueView<C, T, N>
1000where
1001    Self: View,
1002{
1003    type Hasher = sha3::Sha3_256;
1004
1005    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1006        self.hash().await
1007    }
1008
1009    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1010        #[cfg(with_metrics)]
1011        let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
1012        let elements = self.elements().await?;
1013        let mut hasher = sha3::Sha3_256::default();
1014        hasher.update_with_bcs_bytes(&elements)?;
1015        Ok(hasher.finalize())
1016    }
1017}
1018
1019/// Type wrapping `QueueView` while memoizing the hash.
1020pub type HashedBucketQueueView<C, T, const N: usize> =
1021    WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
1022
1023/// Wrapper around `BucketQueueView` to compute hashes based on the history of changes.
1024pub type HistoricallyHashedBucketQueueView<C, T, const N: usize> =
1025    HistoricallyHashableView<C, BucketQueueView<C, T, N>>;
1026
1027#[cfg(with_graphql)]
1028mod graphql {
1029    use std::borrow::Cow;
1030
1031    use linera_base::data_types::ArithmeticError;
1032
1033    use super::BucketQueueView;
1034    use crate::{
1035        context::Context,
1036        graphql::{hash_name, mangle},
1037    };
1038
1039    impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
1040        for BucketQueueView<C, T, N>
1041    {
1042        fn type_name() -> Cow<'static, str> {
1043            format!(
1044                "BucketQueueView_{}_{:08x}",
1045                mangle(T::type_name()),
1046                hash_name::<T>()
1047            )
1048            .into()
1049        }
1050    }
1051
1052    #[async_graphql::Object(cache_control(no_cache), name_type)]
1053    impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
1054    where
1055        C: Send + Sync,
1056        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
1057    {
1058        #[graphql(derived(name = "count"))]
1059        async fn count_(&self) -> Result<u32, async_graphql::Error> {
1060            Ok(u32::try_from(self.count()).map_err(|_| ArithmeticError::Overflow)?)
1061        }
1062
1063        async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
1064            Ok(self
1065                .read_front(count.unwrap_or_else(|| self.count()))
1066                .await?)
1067        }
1068    }
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use super::*;
1074    use crate::{
1075        batch::Batch,
1076        context::{Context, MemoryContext},
1077        store::WritableKeyValueStore as _,
1078    };
1079
1080    /// Regression test: a failed load while advancing the cursor in `delete_front`
1081    /// must not leave the view in a state where the current front bucket is not
1082    /// materialized. The next bucket is loaded *before* the cursor advances, so a
1083    /// failed load leaves the cursor (and `current_middle`) untouched.
1084    #[tokio::test]
1085    async fn delete_front_load_failure_preserves_invariant() -> Result<(), ViewError> {
1086        let context = MemoryContext::new_for_testing(());
1087        let mut view = BucketQueueView::<_, u8, 2>::load(context.clone()).await?;
1088        // Six elements -> front=[1,2], middle=[3,4] at index 1, back=[5,6].
1089        for value in [1u8, 2, 3, 4, 5, 6] {
1090            view.push_back(value);
1091        }
1092        save(&context, &mut view).await?;
1093
1094        let mut view = BucketQueueView::<_, u8, 2>::load(context.clone()).await?;
1095
1096        // Delete the middle bucket so that loading it during `delete_front` fails.
1097        let middle_key = view.get_middle_key(1)?;
1098        let mut batch = Batch::new();
1099        batch.delete_key(middle_key);
1100        context.store().write_batch(batch).await?;
1101
1102        view.delete_front().await?;
1103        let err = view.delete_front().await.expect_err("load should fail");
1104        assert!(matches!(err, ViewError::MissingEntries(_)));
1105
1106        save(&context, &mut view).await?;
1107
1108        Ok(())
1109    }
1110
1111    /// Roundtrip a queue through save/reload at several sizes around `N` to
1112    /// exercise the front/middle/back layout: empty, partial-front-only,
1113    /// exactly-one-bucket, front+back without middles, and several layouts with
1114    /// middle buckets.
1115    #[tokio::test]
1116    async fn save_load_roundtrip_across_sizes() -> Result<(), ViewError> {
1117        const N: usize = 3;
1118        for size in [0usize, 1, 2, N, N + 1, 2 * N, 2 * N + 1, 5 * N, 5 * N - 1] {
1119            let context = MemoryContext::new_for_testing(());
1120            let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1121            for i in 0..u32::try_from(size).unwrap() {
1122                view.push_back(i);
1123            }
1124            save(&context, &mut view).await?;
1125
1126            let reloaded = BucketQueueView::<_, u32, N>::load(context).await?;
1127            let elements = reloaded.elements().await?;
1128            let expected = (0..u32::try_from(size).unwrap()).collect::<Vec<_>>();
1129            assert_eq!(elements, expected, "size = {size}");
1130            assert_eq!(reloaded.count(), size, "count for size = {size}");
1131        }
1132        Ok(())
1133    }
1134
1135    /// Middle buckets must always contain exactly `N` elements after save — this is
1136    /// the invariant that lets the layout track them by count alone. Read the middle
1137    /// bucket keys back from storage and check each holds `N` elements.
1138    #[tokio::test]
1139    async fn middle_buckets_are_always_full() -> Result<(), ViewError> {
1140        const N: usize = 4;
1141        let context = MemoryContext::new_for_testing(());
1142        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1143        // Push enough to create several middles, then save.
1144        for i in 0..u32::try_from(5 * N + 2).unwrap() {
1145            view.push_back(i);
1146        }
1147        save(&context, &mut view).await?;
1148
1149        // Push a partial back to verify the next save merges and re-chunks correctly.
1150        view.push_back(1000);
1151        view.push_back(1001);
1152        save(&context, &mut view).await?;
1153
1154        // Drop a few front elements (less than N), save, and check again.
1155        for _ in 0..(N - 1) {
1156            view.delete_front().await?;
1157        }
1158        save(&context, &mut view).await?;
1159
1160        // After all this, every middle bucket in storage must hold exactly N elements.
1161        let view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1162        let first_index = view.stored_first_index;
1163        for offset in 1..view.stored_num_buckets.saturating_sub(1) {
1164            let key = view.get_middle_key(first_index + offset)?;
1165            let data = context
1166                .store()
1167                .read_value::<Vec<u32>>(&key)
1168                .await?
1169                .expect("middle bucket should be present in storage");
1170            assert_eq!(
1171                data.len(),
1172                N,
1173                "middle at offset {offset} should hold N elements"
1174            );
1175        }
1176        Ok(())
1177    }
1178
1179    /// `stored_count` must be exact across the partial-front and partial-back
1180    /// edge cases, even when the cursor has advanced inside the front bucket.
1181    #[tokio::test]
1182    async fn stored_count_is_exact() -> Result<(), ViewError> {
1183        const N: usize = 3;
1184        let context = MemoryContext::new_for_testing(());
1185        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1186        // 7 elements -> front [0,1,2], middle [3,4,5], back [6].
1187        for i in 0..7u32 {
1188            view.push_back(i);
1189        }
1190        save(&context, &mut view).await?;
1191
1192        let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1193        assert_eq!(view.stored_count(), 7);
1194        view.delete_front().await?; // drop 0
1195        assert_eq!(view.stored_count(), 6);
1196        view.delete_front().await?; // drop 1
1197        assert_eq!(view.stored_count(), 5);
1198        view.delete_front().await?; // drop 2, crosses into middle bucket
1199        assert_eq!(view.stored_count(), 4);
1200        Ok(())
1201    }
1202
1203    /// Build a view that hits each `SaveCase` variant and verify dispatch +
1204    /// roundtrip. Pinning each case to a concrete scenario means a refactor
1205    /// that silently drops a branch fails loudly instead of relying on the
1206    /// random fuzz to catch it eventually.
1207    #[tokio::test]
1208    async fn save_plan_covers_each_case() -> Result<(), ViewError> {
1209        const N: usize = 3;
1210
1211        // Empty: a freshly-loaded view with no pending changes.
1212        let context = MemoryContext::new_for_testing(());
1213        let view = BucketQueueView::<_, u32, N>::load(context).await?;
1214        assert!(matches!(view.save_plan()?.case, SaveCase::Empty));
1215
1216        // MetadataOnly: a single stored bucket with the cursor advanced inside it.
1217        let context = MemoryContext::new_for_testing(());
1218        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1219        view.push_back(10);
1220        view.push_back(20);
1221        save(&context, &mut view).await?;
1222        let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1223        view.delete_front().await?;
1224        assert!(matches!(view.save_plan()?.case, SaveCase::MetadataOnly));
1225
1226        // Rewrite: <= 1 bucket survives but there is a structural change
1227        // (consumed past the front bucket; only the back remains).
1228        let context = MemoryContext::new_for_testing(());
1229        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1230        for i in 0..5u32 {
1231            view.push_back(i);
1232        }
1233        save(&context, &mut view).await?;
1234        let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1235        for _ in 0..N {
1236            view.delete_front().await?;
1237        }
1238        assert!(matches!(view.save_plan()?.case, SaveCase::Rewrite));
1239
1240        // Patch: >= 2 buckets survive (here: front + middle + back, with a
1241        // pending new value to force the re-chunk path).
1242        let context = MemoryContext::new_for_testing(());
1243        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1244        for i in 0..7u32 {
1245            view.push_back(i);
1246        }
1247        save(&context, &mut view).await?;
1248        let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1249        view.push_back(100);
1250        assert!(matches!(view.save_plan()?.case, SaveCase::Patch { .. }));
1251
1252        Ok(())
1253    }
1254
1255    /// The most intricate single save: in one `Patch` the cursor has crossed a bucket
1256    /// boundary (so the front is promoted from a former middle), one or more middles
1257    /// survive unchanged, *and* enough new values are pending that merging them with the
1258    /// old back re-chunks into several new middles plus a new back. This exercises front
1259    /// promotion, middle key preservation, and the re-chunk path simultaneously — the one
1260    /// combination `save_plan_covers_each_case` only hits piecemeal.
1261    #[tokio::test]
1262    async fn patch_promotes_front_keeps_middles_and_rechunks() -> Result<(), ViewError> {
1263        const N: usize = 3;
1264        let context = MemoryContext::new_for_testing(());
1265        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1266        // 14 elements -> front [0,1,2], middles [3,4,5] [6,7,8] [9,10,11], back [12,13].
1267        for i in 0..14u32 {
1268            view.push_back(i);
1269        }
1270        save(&context, &mut view).await?;
1271
1272        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1273        // Drop 0,1,2,3: the cursor crosses out of the front bucket and lands at value 4
1274        // inside the former middle [3,4,5], which must now be promoted to the front.
1275        for _ in 0..4 {
1276            view.delete_front().await?;
1277        }
1278        // Push 6 values so that merging with the partial back [12,13] yields
1279        // [12,13,100][101,102,103][104,105]: two new middles and a new back.
1280        for v in 100..106u32 {
1281            view.push_back(v);
1282        }
1283
1284        // Confirm we are about to take the Patch path with the front having moved by a
1285        // whole bucket (remaining_offset == 1) and four buckets surviving the cursor.
1286        let plan = view.save_plan()?;
1287        assert_eq!(plan.remaining_offset, 1);
1288        assert!(matches!(
1289            plan.case,
1290            SaveCase::Patch {
1291                remaining_count: 4,
1292                ..
1293            }
1294        ));
1295
1296        save(&context, &mut view).await?;
1297
1298        // Reload from storage and check the full sequence survived the re-chunk.
1299        let view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1300        let expected: Vec<u32> = (4..14).chain(100..106).collect();
1301        assert_eq!(view.elements().await?, expected);
1302        assert_eq!(view.count(), expected.len());
1303
1304        // The layout grew to 6 buckets (front + 4 middles + back) and every middle still
1305        // holds exactly N — the invariant the whole design rests on.
1306        assert_eq!(view.stored_num_buckets, 6);
1307        let first_index = view.stored_first_index;
1308        for offset in 1..view.stored_num_buckets - 1 {
1309            let key = view.get_middle_key(first_index + offset)?;
1310            let data = context
1311                .store()
1312                .read_value::<Vec<u32>>(&key)
1313                .await?
1314                .expect("middle bucket should be present in storage");
1315            assert_eq!(data.len(), N, "middle at offset {offset} must hold N");
1316        }
1317        Ok(())
1318    }
1319
1320    /// N=1 is degenerate: every bucket holds exactly one element, the front
1321    /// and back can't share a bucket, and every middle is also a single
1322    /// element. Exercise enough operations to cross several bucket boundaries.
1323    #[tokio::test]
1324    async fn n_equals_one_roundtrip() -> Result<(), ViewError> {
1325        let context = MemoryContext::new_for_testing(());
1326        let mut view = BucketQueueView::<_, u32, 1>::load(context.clone()).await?;
1327        for i in 0..5u32 {
1328            view.push_back(i);
1329        }
1330        save(&context, &mut view).await?;
1331
1332        let mut view = BucketQueueView::<_, u32, 1>::load(context.clone()).await?;
1333        assert_eq!(view.elements().await?, vec![0, 1, 2, 3, 4]);
1334        view.delete_front().await?;
1335        view.delete_front().await?;
1336        view.push_back(99);
1337        save(&context, &mut view).await?;
1338
1339        let view = BucketQueueView::<_, u32, 1>::load(context).await?;
1340        assert_eq!(view.elements().await?, vec![2, 3, 4, 99]);
1341        Ok(())
1342    }
1343
1344    /// `rollback` must wipe in-memory edits and restore the cursor / state
1345    /// to whatever was last saved.
1346    #[tokio::test]
1347    async fn rollback_restores_saved_state() -> Result<(), ViewError> {
1348        const N: usize = 3;
1349        let context = MemoryContext::new_for_testing(());
1350        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1351        for i in 0..5u32 {
1352            view.push_back(i);
1353        }
1354        save(&context, &mut view).await?;
1355
1356        let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1357        view.delete_front().await?;
1358        view.delete_front().await?;
1359        view.push_back(100);
1360        view.push_back(101);
1361        assert_eq!(view.elements().await?, vec![2, 3, 4, 100, 101]);
1362
1363        view.rollback();
1364        assert_eq!(view.elements().await?, vec![0, 1, 2, 3, 4]);
1365
1366        // After rollback, has_pending_changes must be false again.
1367        assert!(!view.has_pending_changes().await);
1368        Ok(())
1369    }
1370
1371    /// `clear()` followed by `rollback()` must restore the last-saved state,
1372    /// including a non-zero saved front position. `rollback` is synchronous and
1373    /// rebuilds the cursor purely from the in-memory saved layout, so `clear` must
1374    /// preserve it.
1375    #[tokio::test]
1376    async fn rollback_after_clear_restores_front_position() -> Result<(), ViewError> {
1377        const N: usize = 5;
1378        let context = MemoryContext::new_for_testing(());
1379        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1380        for value in [10u32, 20, 30] {
1381            view.push_back(value);
1382        }
1383        save(&context, &mut view).await?;
1384
1385        // Advance the saved front past position 0 (single bucket -> MetadataOnly).
1386        let mut view = BucketQueueView::<_, u32, N>::load(context.clone()).await?;
1387        view.delete_front().await?; // drop 10
1388        save(&context, &mut view).await?;
1389
1390        let mut view = BucketQueueView::<_, u32, N>::load(context).await?;
1391        assert_eq!(view.elements().await?, vec![20, 30]);
1392
1393        view.clear();
1394        assert_eq!(view.elements().await?, Vec::<u32>::new());
1395
1396        view.rollback();
1397        assert_eq!(view.elements().await?, vec![20, 30]);
1398        assert!(!view.has_pending_changes().await);
1399        Ok(())
1400    }
1401
1402    async fn save<V: View>(context: &V::Context, view: &mut V) -> Result<(), ViewError> {
1403        let mut batch = Batch::new();
1404        view.pre_save(&mut batch)?;
1405        context.store().write_batch(batch).await?;
1406        view.post_save();
1407        Ok(())
1408    }
1409}