linera_views/views/
bucket_queue_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{vec_deque::IterMut, VecDeque};
5
6use allocative::Allocative;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10
11use crate::{
12    batch::Batch,
13    common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
14    context::Context,
15    hashable_wrapper::WrappedHashableContainerView,
16    historical_hash_wrapper::HistoricallyHashableView,
17    store::ReadableKeyValueStore as _,
18    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
19};
20
21#[cfg(with_metrics)]
22mod metrics {
23    use std::sync::LazyLock;
24
25    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
26    use prometheus::HistogramVec;
27
28    /// The runtime of hash computation
29    pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
30        register_histogram_vec(
31            "bucket_queue_view_hash_runtime",
32            "BucketQueueView hash runtime",
33            &[],
34            exponential_bucket_latencies(5.0),
35        )
36    });
37}
38
39/// Key tags to create the sub-keys of a [`BucketQueueView`] on top of the base key.
40#[repr(u8)]
41enum KeyTag {
42    /// Key tag for the front bucket (index 0).
43    Front = MIN_VIEW_TAG,
44    /// Key tag for the `BucketStore`.
45    Store,
46    /// Key tag for the content of non-front buckets (index > 0).
47    Index,
48}
49
50/// The metadata of the view in storage.
51#[derive(Clone, Debug, Default, Serialize, Deserialize)]
52struct BucketStore {
53    /// The descriptions of all stored buckets. The first description is expected to start
54    /// with index 0 (front bucket) and will be ignored.
55    descriptions: Vec<BucketDescription>,
56    /// The position of the front value in the front bucket.
57    front_position: usize,
58}
59
60/// The description of a bucket in storage.
61#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)]
62struct BucketDescription {
63    /// The length of the bucket (at most N).
64    length: usize,
65    /// The index of the bucket in storage.
66    index: usize,
67}
68
69impl BucketStore {
70    fn len(&self) -> usize {
71        self.descriptions.len()
72    }
73}
74
75/// The position of a value in the stored buckket.
76#[derive(Copy, Clone, Debug, Allocative)]
77struct Cursor {
78    /// The offset of the bucket in the vector of stored buckets.
79    offset: usize,
80    /// The position of the value in the stored bucket.
81    position: usize,
82}
83
84/// The state of a stored bucket in memory.
85#[derive(Clone, Debug, Allocative)]
86enum State<T> {
87    Loaded { data: Vec<T> },
88    NotLoaded { length: usize },
89}
90
91impl<T> Bucket<T> {
92    fn len(&self) -> usize {
93        match &self.state {
94            State::Loaded { data } => data.len(),
95            State::NotLoaded { length } => *length,
96        }
97    }
98
99    fn is_loaded(&self) -> bool {
100        match self.state {
101            State::Loaded { .. } => true,
102            State::NotLoaded { .. } => false,
103        }
104    }
105
106    fn to_description(&self) -> BucketDescription {
107        BucketDescription {
108            length: self.len(),
109            index: self.index,
110        }
111    }
112}
113
114/// A stored bucket.
115#[derive(Clone, Debug, Allocative)]
116struct Bucket<T> {
117    /// The index in storage.
118    index: usize,
119    /// The state of the bucket.
120    state: State<T>,
121}
122
123/// A view that supports a FIFO queue for values of type `T`.
124/// The size `N` has to be chosen by taking into account the size of the type `T`
125/// and the basic size of a block. For example a total size of 100 bytes to 10 KB
126/// seems adequate.
127//#[allocative(bound = "T: Allocative")]
128#[derive(Debug, Allocative)]
129#[allocative(bound = "C, T: Allocative, const N: usize")]
130pub struct BucketQueueView<C, T, const N: usize> {
131    /// The view context.
132    #[allocative(skip)]
133    context: C,
134    /// The stored buckets. Some buckets may not be loaded. The first one is always loaded.
135    stored_buckets: VecDeque<Bucket<T>>,
136    /// The newly inserted back values.
137    new_back_values: VecDeque<T>,
138    /// The position for the stored front value in the first stored bucket.
139    stored_front_position: usize,
140    /// The current position of the front value if it is in the stored buckets, and `None`
141    /// otherwise.
142    cursor: Option<Cursor>,
143    /// Whether the storage is to be deleted or not.
144    delete_storage_first: bool,
145}
146
147impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
148where
149    C: Context,
150    T: Send + Sync + Clone + Serialize + DeserializeOwned,
151{
152    const NUM_INIT_KEYS: usize = 2;
153
154    type Context = C;
155
156    fn context(&self) -> C {
157        self.context.clone()
158    }
159
160    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
161        let key1 = context.base_key().base_tag(KeyTag::Front as u8);
162        let key2 = context.base_key().base_tag(KeyTag::Store as u8);
163        Ok(vec![key1, key2])
164    }
165
166    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
167        let value1 = values.first().ok_or(ViewError::PostLoadValuesError)?;
168        let value2 = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
169        let front = from_bytes_option::<Vec<T>>(value1)?;
170        let mut stored_buckets = VecDeque::from(match front {
171            Some(data) => {
172                let bucket = Bucket {
173                    index: 0,
174                    state: State::Loaded { data },
175                };
176                vec![bucket]
177            }
178            None => {
179                vec![]
180            }
181        });
182        let bucket_store = from_bytes_option_or_default::<BucketStore>(value2)?;
183        // Ignoring `bucket_store.descriptions[0]`.
184        // TODO(#4969): Remove redundant BucketDescription in BucketQueueView.
185        for i in 1..bucket_store.len() {
186            let length = bucket_store.descriptions[i].length;
187            let index = bucket_store.descriptions[i].index;
188            stored_buckets.push_back(Bucket {
189                index,
190                state: State::NotLoaded { length },
191            });
192        }
193        let cursor = if bucket_store.descriptions.is_empty() {
194            None
195        } else {
196            Some(Cursor {
197                offset: 0,
198                position: bucket_store.front_position,
199            })
200        };
201        Ok(Self {
202            context,
203            stored_buckets,
204            stored_front_position: bucket_store.front_position,
205            new_back_values: VecDeque::new(),
206            cursor,
207            delete_storage_first: false,
208        })
209    }
210
211    fn rollback(&mut self) {
212        self.delete_storage_first = false;
213        self.cursor = if self.stored_buckets.is_empty() {
214            None
215        } else {
216            Some(Cursor {
217                offset: 0,
218                position: self.stored_front_position,
219            })
220        };
221        self.new_back_values.clear();
222    }
223
224    async fn has_pending_changes(&self) -> bool {
225        if self.delete_storage_first {
226            return true;
227        }
228        if !self.stored_buckets.is_empty() {
229            let Some(cursor) = self.cursor else {
230                return true;
231            };
232            if cursor.offset != 0 || cursor.position != self.stored_front_position {
233                return true;
234            }
235        }
236        !self.new_back_values.is_empty()
237    }
238
239    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
240        let mut delete_view = false;
241        let mut descriptions = Vec::new();
242        let mut stored_front_position = self.stored_front_position;
243        if self.stored_count() == 0 {
244            let key_prefix = self.context.base_key().bytes.clone();
245            batch.delete_key_prefix(key_prefix);
246            delete_view = true;
247            stored_front_position = 0;
248        } else if let Some(cursor) = self.cursor {
249            // Delete buckets that are before the cursor
250            for i in 0..cursor.offset {
251                let bucket = &self.stored_buckets[i];
252                let index = bucket.index;
253                let key = self.get_bucket_key(index)?;
254                batch.delete_key(key);
255            }
256            stored_front_position = cursor.position;
257            // Build descriptions for remaining buckets
258            let first_index = self.stored_buckets[cursor.offset].index;
259            let start_offset = if first_index != 0 {
260                // Need to move the first remaining bucket to index 0
261                let key = self.get_bucket_key(first_index)?;
262                batch.delete_key(key);
263                let key = self.get_bucket_key(0)?;
264                let bucket = &self.stored_buckets[cursor.offset];
265                let State::Loaded { data } = &bucket.state else {
266                    unreachable!("The front bucket is always loaded.");
267                };
268                batch.put_key_value(key, data)?;
269                descriptions.push(BucketDescription {
270                    length: bucket.len(),
271                    index: 0,
272                });
273                cursor.offset + 1
274            } else {
275                cursor.offset
276            };
277            for bucket in self.stored_buckets.range(start_offset..) {
278                descriptions.push(bucket.to_description());
279            }
280        }
281        if !self.new_back_values.is_empty() {
282            delete_view = false;
283            // Calculate the starting index for new buckets
284            // If stored_count() == 0, all stored buckets are being removed, so start at 0
285            // Otherwise, start after the last remaining bucket
286            let mut index = if self.stored_count() == 0 {
287                0
288            } else if let Some(last_description) = descriptions.last() {
289                last_description.index + 1
290            } else {
291                // This shouldn't happen if stored_count() > 0
292                0
293            };
294            let mut start = 0;
295            while start < self.new_back_values.len() {
296                let end = std::cmp::min(start + N, self.new_back_values.len());
297                let value_chunk: Vec<_> = self.new_back_values.range(start..end).collect();
298                let key = self.get_bucket_key(index)?;
299                batch.put_key_value(key, &value_chunk)?;
300                descriptions.push(BucketDescription {
301                    index,
302                    length: end - start,
303                });
304                index += 1;
305                start = end;
306            }
307        }
308        if !delete_view {
309            let bucket_store = BucketStore {
310                descriptions,
311                front_position: stored_front_position,
312            };
313            let key = self.context.base_key().base_tag(KeyTag::Store as u8);
314            batch.put_key_value(key, &bucket_store)?;
315        }
316        Ok(delete_view)
317    }
318
319    fn post_save(&mut self) {
320        if self.stored_count() == 0 {
321            self.stored_buckets.clear();
322            self.stored_front_position = 0;
323            self.cursor = None;
324        } else if let Some(cursor) = self.cursor {
325            for _ in 0..cursor.offset {
326                self.stored_buckets.pop_front();
327            }
328            self.cursor = Some(Cursor {
329                offset: 0,
330                position: cursor.position,
331            });
332            self.stored_front_position = cursor.position;
333            // We need to ensure that the first index is in the front.
334            self.stored_buckets[0].index = 0;
335        }
336        if !self.new_back_values.is_empty() {
337            let mut index = match self.stored_buckets.back() {
338                Some(bucket) => bucket.index + 1,
339                None => 0,
340            };
341            let new_back_values = std::mem::take(&mut self.new_back_values);
342            let new_back_values = new_back_values.into_iter().collect::<Vec<_>>();
343            for value_chunk in new_back_values.chunks(N) {
344                self.stored_buckets.push_back(Bucket {
345                    index,
346                    state: State::Loaded {
347                        data: value_chunk.to_vec(),
348                    },
349                });
350                index += 1;
351            }
352            if self.cursor.is_none() {
353                self.cursor = Some(Cursor {
354                    offset: 0,
355                    position: 0,
356                });
357            }
358        }
359        self.delete_storage_first = false;
360    }
361
362    fn clear(&mut self) {
363        self.delete_storage_first = true;
364        self.new_back_values.clear();
365        self.cursor = None;
366    }
367}
368
369impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
370where
371    Self: View,
372{
373    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
374        Ok(BucketQueueView {
375            context: self.context.clone(),
376            stored_buckets: self.stored_buckets.clone(),
377            new_back_values: self.new_back_values.clone(),
378            stored_front_position: self.stored_front_position,
379            cursor: self.cursor,
380            delete_storage_first: self.delete_storage_first,
381        })
382    }
383}
384
385impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
386    /// Gets the key corresponding to this bucket index.
387    fn get_bucket_key(&self, index: usize) -> Result<Vec<u8>, ViewError> {
388        Ok(if index == 0 {
389            self.context.base_key().base_tag(KeyTag::Front as u8)
390        } else {
391            self.context
392                .base_key()
393                .derive_tag_key(KeyTag::Index as u8, &index)?
394        })
395    }
396
397    /// Gets the number of entries in the container that are stored
398    /// ```rust
399    /// # tokio_test::block_on(async {
400    /// # use linera_views::context::MemoryContext;
401    /// # use linera_views::bucket_queue_view::BucketQueueView;
402    /// # use crate::linera_views::views::View;
403    /// # let context = MemoryContext::new_for_testing(());
404    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
405    /// queue.push_back(34);
406    /// assert_eq!(queue.stored_count(), 0);
407    /// # })
408    /// ```
409    pub fn stored_count(&self) -> usize {
410        if self.delete_storage_first {
411            0
412        } else {
413            let Some(cursor) = self.cursor else {
414                return 0;
415            };
416            let mut stored_count = 0;
417            for offset in cursor.offset..self.stored_buckets.len() {
418                stored_count += self.stored_buckets[offset].len();
419            }
420            stored_count -= cursor.position;
421            stored_count
422        }
423    }
424
425    /// The total number of entries of the container
426    /// ```rust
427    /// # tokio_test::block_on(async {
428    /// # use linera_views::context::MemoryContext;
429    /// # use linera_views::bucket_queue_view::BucketQueueView;
430    /// # use crate::linera_views::views::View;
431    /// # let context = MemoryContext::new_for_testing(());
432    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
433    /// queue.push_back(34);
434    /// assert_eq!(queue.count(), 1);
435    /// # })
436    /// ```
437    pub fn count(&self) -> usize {
438        self.stored_count() + self.new_back_values.len()
439    }
440}
441
442impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
443    /// Gets a reference on the front value if any.
444    /// ```rust
445    /// # tokio_test::block_on(async {
446    /// # use linera_views::context::MemoryContext;
447    /// # use linera_views::bucket_queue_view::BucketQueueView;
448    /// # use crate::linera_views::views::View;
449    /// # let context = MemoryContext::new_for_testing(());
450    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
451    /// queue.push_back(34);
452    /// queue.push_back(42);
453    /// assert_eq!(queue.front().cloned(), Some(34));
454    /// # })
455    /// ```
456    pub fn front(&self) -> Option<&T> {
457        match self.cursor {
458            Some(Cursor { offset, position }) => {
459                let bucket = &self.stored_buckets[offset];
460                let State::Loaded { data } = &bucket.state else {
461                    unreachable!("The front bucket should always be loaded");
462                };
463                Some(&data[position])
464            }
465            None => self.new_back_values.front(),
466        }
467    }
468
469    /// Reads the front value, if any.
470    /// ```rust
471    /// # tokio_test::block_on(async {
472    /// # use linera_views::context::MemoryContext;
473    /// # use linera_views::bucket_queue_view::BucketQueueView;
474    /// # use crate::linera_views::views::View;
475    /// # let context = MemoryContext::new_for_testing(());
476    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
477    /// queue.push_back(34);
478    /// queue.push_back(42);
479    /// let front = queue.front_mut().unwrap();
480    /// *front = 43;
481    /// assert_eq!(queue.front().cloned(), Some(43));
482    /// # })
483    /// ```
484    pub fn front_mut(&mut self) -> Option<&mut T> {
485        match self.cursor {
486            Some(Cursor { offset, position }) => {
487                let bucket = self
488                    .stored_buckets
489                    .get_mut(offset)
490                    .expect("cursor.offset must be a valid index into stored_buckets");
491                let State::Loaded { data } = &mut bucket.state else {
492                    unreachable!("The front bucket should always be loaded");
493                };
494                Some(
495                    data.get_mut(position)
496                        .expect("cursor.position must be a valid index within the front bucket"),
497                )
498            }
499            None => self.new_back_values.front_mut(),
500        }
501    }
502
503    /// Deletes the front value, if any.
504    /// ```rust
505    /// # tokio_test::block_on(async {
506    /// # use linera_views::context::MemoryContext;
507    /// # use linera_views::bucket_queue_view::BucketQueueView;
508    /// # use crate::linera_views::views::View;
509    /// # let context = MemoryContext::new_for_testing(());
510    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
511    /// queue.push_back(34 as u128);
512    /// queue.delete_front().await.unwrap();
513    /// assert_eq!(queue.elements().await.unwrap(), Vec::<u128>::new());
514    /// # })
515    /// ```
516    pub async fn delete_front(&mut self) -> Result<(), ViewError> {
517        match self.cursor {
518            Some(cursor) => {
519                let mut offset = cursor.offset;
520                let mut position = cursor.position + 1;
521                if self.stored_buckets[offset].len() == position {
522                    offset += 1;
523                    position = 0;
524                }
525                if offset == self.stored_buckets.len() {
526                    self.cursor = None;
527                } else {
528                    if !self.stored_buckets[offset].is_loaded() {
529                        let index = self.stored_buckets[offset].index;
530                        let key = self.get_bucket_key(index)?;
531                        let data = self.context.store().read_value(&key).await?;
532                        let data = match data {
533                            Some(value) => value,
534                            None => {
535                                return Err(ViewError::MissingEntries(
536                                    "BucketQueueView::delete_front".into(),
537                                ));
538                            }
539                        };
540                        self.stored_buckets[offset].state = State::Loaded { data };
541                    }
542                    self.cursor = Some(Cursor { offset, position });
543                }
544            }
545            None => {
546                self.new_back_values.pop_front();
547            }
548        }
549        Ok(())
550    }
551
552    /// Pushes a value to the end of the queue.
553    /// ```rust
554    /// # tokio_test::block_on(async {
555    /// # use linera_views::context::MemoryContext;
556    /// # use linera_views::bucket_queue_view::BucketQueueView;
557    /// # use crate::linera_views::views::View;
558    /// # let context = MemoryContext::new_for_testing(());
559    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
560    /// queue.push_back(34);
561    /// assert_eq!(queue.elements().await.unwrap(), vec![34]);
562    /// # })
563    /// ```
564    pub fn push_back(&mut self, value: T) {
565        self.new_back_values.push_back(value);
566    }
567
568    /// Returns the list of elements in the queue.
569    /// ```rust
570    /// # tokio_test::block_on(async {
571    /// # use linera_views::context::MemoryContext;
572    /// # use linera_views::bucket_queue_view::BucketQueueView;
573    /// # use crate::linera_views::views::View;
574    /// # let context = MemoryContext::new_for_testing(());
575    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
576    /// queue.push_back(34);
577    /// queue.push_back(37);
578    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
579    /// # })
580    /// ```
581    pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
582        let count = self.count();
583        self.read_context(self.cursor, count).await
584    }
585
586    /// Returns the last element of a bucket queue view
587    /// ```rust
588    /// # tokio_test::block_on(async {
589    /// # use linera_views::context::MemoryContext;
590    /// # use linera_views::bucket_queue_view::BucketQueueView;
591    /// # use crate::linera_views::views::View;
592    /// # let context = MemoryContext::new_for_testing(());
593    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
594    /// queue.push_back(34);
595    /// queue.push_back(37);
596    /// assert_eq!(queue.back().await.unwrap(), Some(37));
597    /// # })
598    /// ```
599    pub async fn back(&mut self) -> Result<Option<T>, ViewError>
600    where
601        T: Clone,
602    {
603        if let Some(value) = self.new_back_values.back() {
604            return Ok(Some(value.clone()));
605        }
606        if self.cursor.is_none() {
607            return Ok(None);
608        }
609        let Some(bucket) = self.stored_buckets.back() else {
610            return Ok(None);
611        };
612        match &bucket.state {
613            State::Loaded { data } => Ok(Some(
614                data.last().expect("a stored bucket is never empty").clone(),
615            )),
616            State::NotLoaded { .. } => {
617                let key = self.get_bucket_key(bucket.index)?;
618                let data = self
619                    .context
620                    .store()
621                    .read_value::<Vec<T>>(&key)
622                    .await?
623                    .ok_or_else(|| ViewError::MissingEntries("BucketQueueView::back".into()))?;
624                let result = data.last().expect("a stored bucket is never empty").clone();
625                self.stored_buckets
626                    .back_mut()
627                    .expect("stored_buckets is non-empty since we just accessed its back element")
628                    .state = State::Loaded { data };
629                Ok(Some(result))
630            }
631        }
632    }
633
634    async fn read_context(
635        &self,
636        cursor: Option<Cursor>,
637        count: usize,
638    ) -> Result<Vec<T>, ViewError> {
639        if count == 0 {
640            return Ok(Vec::new());
641        }
642        let mut elements = Vec::<T>::new();
643        let mut count_remain = count;
644        if let Some(cursor) = cursor {
645            let mut keys = Vec::new();
646            let mut position = cursor.position;
647            for offset in cursor.offset..self.stored_buckets.len() {
648                let bucket = &self.stored_buckets[offset];
649                let size = bucket.len() - position;
650                if !bucket.is_loaded() {
651                    let key = self.get_bucket_key(bucket.index)?;
652                    keys.push(key);
653                };
654                if size >= count_remain {
655                    break;
656                }
657                count_remain -= size;
658                position = 0;
659            }
660            let values = self.context.store().read_multi_values_bytes(&keys).await?;
661            let mut value_pos = 0;
662            count_remain = count;
663            let mut position = cursor.position;
664            for offset in cursor.offset..self.stored_buckets.len() {
665                let bucket = &self.stored_buckets[offset];
666                let size = bucket.len() - position;
667                let data = match &bucket.state {
668                    State::Loaded { data } => data,
669                    State::NotLoaded { .. } => {
670                        let value = match &values[value_pos] {
671                            Some(value) => value,
672                            None => {
673                                return Err(ViewError::MissingEntries(
674                                    "BucketQueueView::read_context".into(),
675                                ));
676                            }
677                        };
678                        value_pos += 1;
679                        &bcs::from_bytes::<Vec<T>>(value)?
680                    }
681                };
682                elements.extend(data[position..].iter().take(count_remain).cloned());
683                if size >= count_remain {
684                    return Ok(elements);
685                }
686                count_remain -= size;
687                position = 0;
688            }
689        }
690        let count_read = std::cmp::min(count_remain, self.new_back_values.len());
691        elements.extend(self.new_back_values.range(0..count_read).cloned());
692        Ok(elements)
693    }
694
695    /// Returns the first elements of a bucket queue view
696    /// ```rust
697    /// # tokio_test::block_on(async {
698    /// # use linera_views::context::MemoryContext;
699    /// # use linera_views::bucket_queue_view::BucketQueueView;
700    /// # use crate::linera_views::views::View;
701    /// # let context = MemoryContext::new_for_testing(());
702    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
703    /// queue.push_back(34);
704    /// queue.push_back(37);
705    /// queue.push_back(47);
706    /// assert_eq!(queue.read_front(2).await.unwrap(), vec![34, 37]);
707    /// # })
708    /// ```
709    pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
710        let count = std::cmp::min(count, self.count());
711        self.read_context(self.cursor, count).await
712    }
713
714    /// Returns the last element of a bucket queue view
715    /// ```rust
716    /// # tokio_test::block_on(async {
717    /// # use linera_views::context::MemoryContext;
718    /// # use linera_views::bucket_queue_view::BucketQueueView;
719    /// # use crate::linera_views::views::View;
720    /// # let context = MemoryContext::new_for_testing(());
721    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
722    /// queue.push_back(34);
723    /// queue.push_back(37);
724    /// queue.push_back(47);
725    /// assert_eq!(queue.read_back(2).await.unwrap(), vec![37, 47]);
726    /// # })
727    /// ```
728    pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
729        let count = std::cmp::min(count, self.count());
730        if count <= self.new_back_values.len() {
731            let start = self.new_back_values.len() - count;
732            Ok(self
733                .new_back_values
734                .range(start..)
735                .cloned()
736                .collect::<Vec<_>>())
737        } else {
738            let mut increment = self.count() - count;
739            let Some(cursor) = self.cursor else {
740                unreachable!("Cursor should be Some when stored_count > 0");
741            };
742            let mut position = cursor.position;
743            for offset in cursor.offset..self.stored_buckets.len() {
744                let size = self.stored_buckets[offset].len() - position;
745                if increment < size {
746                    return self
747                        .read_context(
748                            Some(Cursor {
749                                offset,
750                                position: position + increment,
751                            }),
752                            count,
753                        )
754                        .await;
755                }
756                increment -= size;
757                position = 0;
758            }
759            unreachable!("BucketQueueView::read_back: iterated past all stored buckets without finding the requested position");
760        }
761    }
762
763    async fn load_all(&mut self) -> Result<(), ViewError> {
764        if !self.delete_storage_first {
765            let elements = self.elements().await?;
766            self.new_back_values.clear();
767            for elt in elements {
768                self.new_back_values.push_back(elt);
769            }
770            self.cursor = None;
771            self.delete_storage_first = true;
772        }
773        Ok(())
774    }
775
776    /// Gets a mutable iterator on the entries of the queue
777    /// ```rust
778    /// # tokio_test::block_on(async {
779    /// # use linera_views::context::MemoryContext;
780    /// # use linera_views::bucket_queue_view::BucketQueueView;
781    /// # use linera_views::views::View;
782    /// # let context = MemoryContext::new_for_testing(());
783    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
784    /// queue.push_back(34);
785    /// let mut iter = queue.try_iter_mut().await.unwrap();
786    /// let value = iter.next().unwrap();
787    /// *value = 42;
788    /// assert_eq!(queue.elements().await.unwrap(), vec![42]);
789    /// # })
790    /// ```
791    pub async fn try_iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
792        self.load_all().await?;
793        Ok(self.new_back_values.iter_mut())
794    }
795}
796
797impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
798    for BucketQueueView<C, T, N>
799where
800    Self: View,
801{
802    type Hasher = sha3::Sha3_256;
803
804    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
805        self.hash().await
806    }
807
808    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
809        #[cfg(with_metrics)]
810        let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
811        let elements = self.elements().await?;
812        let mut hasher = sha3::Sha3_256::default();
813        hasher.update_with_bcs_bytes(&elements)?;
814        Ok(hasher.finalize())
815    }
816}
817
818/// Type wrapping `QueueView` while memoizing the hash.
819pub type HashedBucketQueueView<C, T, const N: usize> =
820    WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
821
822/// Wrapper around `BucketQueueView` to compute hashes based on the history of changes.
823pub type HistoricallyHashedBucketQueueView<C, T, const N: usize> =
824    HistoricallyHashableView<C, BucketQueueView<C, T, N>>;
825
826#[cfg(with_graphql)]
827mod graphql {
828    use std::borrow::Cow;
829
830    use super::BucketQueueView;
831    use crate::{
832        context::Context,
833        graphql::{hash_name, mangle},
834    };
835
836    impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
837        for BucketQueueView<C, T, N>
838    {
839        fn type_name() -> Cow<'static, str> {
840            format!(
841                "BucketQueueView_{}_{:08x}",
842                mangle(T::type_name()),
843                hash_name::<T>()
844            )
845            .into()
846        }
847    }
848
849    #[async_graphql::Object(cache_control(no_cache), name_type)]
850    impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
851    where
852        C: Send + Sync,
853        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
854    {
855        #[graphql(derived(name = "count"))]
856        async fn count_(&self) -> Result<u32, async_graphql::Error> {
857            Ok(self.count() as u32)
858        }
859
860        async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
861            Ok(self
862                .read_front(count.unwrap_or_else(|| self.count()))
863                .await?)
864        }
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use super::*;
871    use crate::{
872        batch::Batch,
873        context::{Context, MemoryContext},
874        store::WritableKeyValueStore as _,
875    };
876
877    /// Regression test: a failed load while advancing the cursor in
878    /// `delete_front` must not leave the view in a state where the bucket at
879    /// `cursor.offset` is `NotLoaded`. Previously the cursor was advanced
880    /// before the load was attempted, so a subsequent `pre_save` would hit
881    /// `unreachable!("The front bucket is always loaded.")`.
882    #[tokio::test]
883    async fn delete_front_load_failure_preserves_invariant() -> Result<(), ViewError> {
884        let context = MemoryContext::new_for_testing(());
885        let mut view = BucketQueueView::<_, u8, 2>::load(context.clone()).await?;
886        for value in [1u8, 2, 3, 4] {
887            view.push_back(value);
888        }
889        save(&context, &mut view).await?;
890
891        let mut view = BucketQueueView::<_, u8, 2>::load(context.clone()).await?;
892
893        let bucket1_key = view.get_bucket_key(1)?;
894        let mut batch = Batch::new();
895        batch.delete_key(bucket1_key);
896        context.store().write_batch(batch).await?;
897
898        view.delete_front().await?;
899        let err = view.delete_front().await.expect_err("load should fail");
900        assert!(matches!(err, ViewError::MissingEntries(_)));
901
902        save(&context, &mut view).await?;
903
904        Ok(())
905    }
906
907    async fn save<V: View>(context: &V::Context, view: &mut V) -> Result<(), ViewError> {
908        let mut batch = Batch::new();
909        view.pre_save(&mut batch)?;
910        context.store().write_batch(batch).await?;
911        view.post_save();
912        Ok(())
913    }
914}