linera_views/views/
bucket_queue_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{vec_deque::IterMut, VecDeque};
5
6use allocative::Allocative;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10
11use crate::{
12    batch::Batch,
13    common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
14    context::Context,
15    hashable_wrapper::WrappedHashableContainerView,
16    historical_hash_wrapper::HistoricallyHashableView,
17    store::ReadableKeyValueStore as _,
18    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
19};
20
21#[cfg(with_metrics)]
22mod metrics {
23    use std::sync::LazyLock;
24
25    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
26    use prometheus::HistogramVec;
27
28    /// The runtime of hash computation
29    pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
30        register_histogram_vec(
31            "bucket_queue_view_hash_runtime",
32            "BucketQueueView hash runtime",
33            &[],
34            exponential_bucket_latencies(5.0),
35        )
36    });
37}
38
39/// Key tags to create the sub-keys of a [`BucketQueueView`] on top of the base key.
40#[repr(u8)]
41enum KeyTag {
42    /// Key tag for the front bucket (index 0).
43    Front = MIN_VIEW_TAG,
44    /// Key tag for the `BucketStore`.
45    Store,
46    /// Key tag for the content of non-front buckets (index > 0).
47    Index,
48}
49
50/// The metadata of the view in storage.
51#[derive(Clone, Debug, Default, Serialize, Deserialize)]
52struct BucketStore {
53    /// The descriptions of all stored buckets. The first description is expected to start
54    /// with index 0 (front bucket) and will be ignored.
55    descriptions: Vec<BucketDescription>,
56    /// The position of the front value in the front bucket.
57    front_position: usize,
58}
59
60/// The description of a bucket in storage.
61#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)]
62struct BucketDescription {
63    /// The length of the bucket (at most N).
64    length: usize,
65    /// The index of the bucket in storage.
66    index: usize,
67}
68
69impl BucketStore {
70    fn len(&self) -> usize {
71        self.descriptions.len()
72    }
73}
74
75/// The position of a value in the stored buckket.
76#[derive(Copy, Clone, Debug, Allocative)]
77struct Cursor {
78    /// The offset of the bucket in the vector of stored buckets.
79    offset: usize,
80    /// The position of the value in the stored bucket.
81    position: usize,
82}
83
84/// The state of a stored bucket in memory.
85#[derive(Clone, Debug, Allocative)]
86enum State<T> {
87    Loaded { data: Vec<T> },
88    NotLoaded { length: usize },
89}
90
91impl<T> Bucket<T> {
92    fn len(&self) -> usize {
93        match &self.state {
94            State::Loaded { data } => data.len(),
95            State::NotLoaded { length } => *length,
96        }
97    }
98
99    fn is_loaded(&self) -> bool {
100        match self.state {
101            State::Loaded { .. } => true,
102            State::NotLoaded { .. } => false,
103        }
104    }
105
106    fn to_description(&self) -> BucketDescription {
107        BucketDescription {
108            length: self.len(),
109            index: self.index,
110        }
111    }
112}
113
114/// A stored bucket.
115#[derive(Clone, Debug, Allocative)]
116struct Bucket<T> {
117    /// The index in storage.
118    index: usize,
119    /// The state of the bucket.
120    state: State<T>,
121}
122
123/// A view that supports a FIFO queue for values of type `T`.
124/// The size `N` has to be chosen by taking into account the size of the type `T`
125/// and the basic size of a block. For example a total size of 100 bytes to 10 KB
126/// seems adequate.
127//#[allocative(bound = "T: Allocative")]
128#[derive(Debug, Allocative)]
129#[allocative(bound = "C, T: Allocative, const N: usize")]
130pub struct BucketQueueView<C, T, const N: usize> {
131    /// The view context.
132    #[allocative(skip)]
133    context: C,
134    /// The stored buckets. Some buckets may not be loaded. The first one is always loaded.
135    stored_buckets: VecDeque<Bucket<T>>,
136    /// The newly inserted back values.
137    new_back_values: VecDeque<T>,
138    /// The position for the stored front value in the first stored bucket.
139    stored_front_position: usize,
140    /// The current position of the front value if it is in the stored buckets, and `None`
141    /// otherwise.
142    cursor: Option<Cursor>,
143    /// Whether the storage is to be deleted or not.
144    delete_storage_first: bool,
145}
146
147impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
148where
149    C: Context,
150    T: Send + Sync + Clone + Serialize + DeserializeOwned,
151{
152    const NUM_INIT_KEYS: usize = 2;
153
154    type Context = C;
155
156    fn context(&self) -> C {
157        self.context.clone()
158    }
159
160    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
161        let key1 = context.base_key().base_tag(KeyTag::Front as u8);
162        let key2 = context.base_key().base_tag(KeyTag::Store as u8);
163        Ok(vec![key1, key2])
164    }
165
166    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
167        let value1 = values.first().ok_or(ViewError::PostLoadValuesError)?;
168        let value2 = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
169        let front = from_bytes_option::<Vec<T>>(value1)?;
170        let mut stored_buckets = VecDeque::from(match front {
171            Some(data) => {
172                let bucket = Bucket {
173                    index: 0,
174                    state: State::Loaded { data },
175                };
176                vec![bucket]
177            }
178            None => {
179                vec![]
180            }
181        });
182        let bucket_store = from_bytes_option_or_default::<BucketStore>(value2)?;
183        // Ignoring `bucket_store.descriptions[0]`.
184        // TODO(#4969): Remove redundant BucketDescription in BucketQueueView.
185        for i in 1..bucket_store.len() {
186            let length = bucket_store.descriptions[i].length;
187            let index = bucket_store.descriptions[i].index;
188            stored_buckets.push_back(Bucket {
189                index,
190                state: State::NotLoaded { length },
191            });
192        }
193        let cursor = if bucket_store.descriptions.is_empty() {
194            None
195        } else {
196            Some(Cursor {
197                offset: 0,
198                position: bucket_store.front_position,
199            })
200        };
201        Ok(Self {
202            context,
203            stored_buckets,
204            stored_front_position: bucket_store.front_position,
205            new_back_values: VecDeque::new(),
206            cursor,
207            delete_storage_first: false,
208        })
209    }
210
211    fn rollback(&mut self) {
212        self.delete_storage_first = false;
213        self.cursor = if self.stored_buckets.is_empty() {
214            None
215        } else {
216            Some(Cursor {
217                offset: 0,
218                position: self.stored_front_position,
219            })
220        };
221        self.new_back_values.clear();
222    }
223
224    async fn has_pending_changes(&self) -> bool {
225        if self.delete_storage_first {
226            return true;
227        }
228        if !self.stored_buckets.is_empty() {
229            let Some(cursor) = self.cursor else {
230                return true;
231            };
232            if cursor.offset != 0 || cursor.position != self.stored_front_position {
233                return true;
234            }
235        }
236        !self.new_back_values.is_empty()
237    }
238
239    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
240        let mut delete_view = false;
241        let mut descriptions = Vec::new();
242        let mut stored_front_position = self.stored_front_position;
243        if self.stored_count() == 0 {
244            let key_prefix = self.context.base_key().bytes.clone();
245            batch.delete_key_prefix(key_prefix);
246            delete_view = true;
247            stored_front_position = 0;
248        } else if let Some(cursor) = self.cursor {
249            // Delete buckets that are before the cursor
250            for i in 0..cursor.offset {
251                let bucket = &self.stored_buckets[i];
252                let index = bucket.index;
253                let key = self.get_bucket_key(index)?;
254                batch.delete_key(key);
255            }
256            stored_front_position = cursor.position;
257            // Build descriptions for remaining buckets
258            let first_index = self.stored_buckets[cursor.offset].index;
259            let start_offset = if first_index != 0 {
260                // Need to move the first remaining bucket to index 0
261                let key = self.get_bucket_key(first_index)?;
262                batch.delete_key(key);
263                let key = self.get_bucket_key(0)?;
264                let bucket = &self.stored_buckets[cursor.offset];
265                let State::Loaded { data } = &bucket.state else {
266                    unreachable!("The front bucket is always loaded.");
267                };
268                batch.put_key_value(key, data)?;
269                descriptions.push(BucketDescription {
270                    length: bucket.len(),
271                    index: 0,
272                });
273                cursor.offset + 1
274            } else {
275                cursor.offset
276            };
277            for bucket in self.stored_buckets.range(start_offset..) {
278                descriptions.push(bucket.to_description());
279            }
280        }
281        if !self.new_back_values.is_empty() {
282            delete_view = false;
283            // Calculate the starting index for new buckets
284            // If stored_count() == 0, all stored buckets are being removed, so start at 0
285            // Otherwise, start after the last remaining bucket
286            let mut index = if self.stored_count() == 0 {
287                0
288            } else if let Some(last_description) = descriptions.last() {
289                last_description.index + 1
290            } else {
291                // This shouldn't happen if stored_count() > 0
292                0
293            };
294            let mut start = 0;
295            while start < self.new_back_values.len() {
296                let end = std::cmp::min(start + N, self.new_back_values.len());
297                let value_chunk: Vec<_> = self.new_back_values.range(start..end).collect();
298                let key = self.get_bucket_key(index)?;
299                batch.put_key_value(key, &value_chunk)?;
300                descriptions.push(BucketDescription {
301                    index,
302                    length: end - start,
303                });
304                index += 1;
305                start = end;
306            }
307        }
308        if !delete_view {
309            let bucket_store = BucketStore {
310                descriptions,
311                front_position: stored_front_position,
312            };
313            let key = self.context.base_key().base_tag(KeyTag::Store as u8);
314            batch.put_key_value(key, &bucket_store)?;
315        }
316        Ok(delete_view)
317    }
318
319    fn post_save(&mut self) {
320        if self.stored_count() == 0 {
321            self.stored_buckets.clear();
322            self.stored_front_position = 0;
323            self.cursor = None;
324        } else if let Some(cursor) = self.cursor {
325            for _ in 0..cursor.offset {
326                self.stored_buckets.pop_front();
327            }
328            self.cursor = Some(Cursor {
329                offset: 0,
330                position: cursor.position,
331            });
332            self.stored_front_position = cursor.position;
333            // We need to ensure that the first index is in the front.
334            self.stored_buckets[0].index = 0;
335        }
336        if !self.new_back_values.is_empty() {
337            let mut index = match self.stored_buckets.back() {
338                Some(bucket) => bucket.index + 1,
339                None => 0,
340            };
341            let new_back_values = std::mem::take(&mut self.new_back_values);
342            let new_back_values = new_back_values.into_iter().collect::<Vec<_>>();
343            for value_chunk in new_back_values.chunks(N) {
344                self.stored_buckets.push_back(Bucket {
345                    index,
346                    state: State::Loaded {
347                        data: value_chunk.to_vec(),
348                    },
349                });
350                index += 1;
351            }
352            if self.cursor.is_none() {
353                self.cursor = Some(Cursor {
354                    offset: 0,
355                    position: 0,
356                });
357            }
358        }
359        self.delete_storage_first = false;
360    }
361
362    fn clear(&mut self) {
363        self.delete_storage_first = true;
364        self.new_back_values.clear();
365        self.cursor = None;
366    }
367}
368
369impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
370where
371    Self: View,
372{
373    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
374        Ok(BucketQueueView {
375            context: self.context.clone(),
376            stored_buckets: self.stored_buckets.clone(),
377            new_back_values: self.new_back_values.clone(),
378            stored_front_position: self.stored_front_position,
379            cursor: self.cursor,
380            delete_storage_first: self.delete_storage_first,
381        })
382    }
383}
384
385impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
386    /// Gets the key corresponding to this bucket index.
387    fn get_bucket_key(&self, index: usize) -> Result<Vec<u8>, ViewError> {
388        Ok(if index == 0 {
389            self.context.base_key().base_tag(KeyTag::Front as u8)
390        } else {
391            self.context
392                .base_key()
393                .derive_tag_key(KeyTag::Index as u8, &index)?
394        })
395    }
396
397    /// Gets the number of entries in the container that are stored
398    /// ```rust
399    /// # tokio_test::block_on(async {
400    /// # use linera_views::context::MemoryContext;
401    /// # use linera_views::bucket_queue_view::BucketQueueView;
402    /// # use crate::linera_views::views::View;
403    /// # let context = MemoryContext::new_for_testing(());
404    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
405    /// queue.push_back(34);
406    /// assert_eq!(queue.stored_count(), 0);
407    /// # })
408    /// ```
409    pub fn stored_count(&self) -> usize {
410        if self.delete_storage_first {
411            0
412        } else {
413            let Some(cursor) = self.cursor else {
414                return 0;
415            };
416            let mut stored_count = 0;
417            for offset in cursor.offset..self.stored_buckets.len() {
418                stored_count += self.stored_buckets[offset].len();
419            }
420            stored_count -= cursor.position;
421            stored_count
422        }
423    }
424
425    /// The total number of entries of the container
426    /// ```rust
427    /// # tokio_test::block_on(async {
428    /// # use linera_views::context::MemoryContext;
429    /// # use linera_views::bucket_queue_view::BucketQueueView;
430    /// # use crate::linera_views::views::View;
431    /// # let context = MemoryContext::new_for_testing(());
432    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
433    /// queue.push_back(34);
434    /// assert_eq!(queue.count(), 1);
435    /// # })
436    /// ```
437    pub fn count(&self) -> usize {
438        self.stored_count() + self.new_back_values.len()
439    }
440}
441
442impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
443    /// Gets a reference on the front value if any.
444    /// ```rust
445    /// # tokio_test::block_on(async {
446    /// # use linera_views::context::MemoryContext;
447    /// # use linera_views::bucket_queue_view::BucketQueueView;
448    /// # use crate::linera_views::views::View;
449    /// # let context = MemoryContext::new_for_testing(());
450    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
451    /// queue.push_back(34);
452    /// queue.push_back(42);
453    /// assert_eq!(queue.front().cloned(), Some(34));
454    /// # })
455    /// ```
456    pub fn front(&self) -> Option<&T> {
457        match self.cursor {
458            Some(Cursor { offset, position }) => {
459                let bucket = &self.stored_buckets[offset];
460                let State::Loaded { data } = &bucket.state else {
461                    unreachable!();
462                };
463                Some(&data[position])
464            }
465            None => self.new_back_values.front(),
466        }
467    }
468
469    /// Reads the front value, if any.
470    /// ```rust
471    /// # tokio_test::block_on(async {
472    /// # use linera_views::context::MemoryContext;
473    /// # use linera_views::bucket_queue_view::BucketQueueView;
474    /// # use crate::linera_views::views::View;
475    /// # let context = MemoryContext::new_for_testing(());
476    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
477    /// queue.push_back(34);
478    /// queue.push_back(42);
479    /// let front = queue.front_mut().unwrap();
480    /// *front = 43;
481    /// assert_eq!(queue.front().cloned(), Some(43));
482    /// # })
483    /// ```
484    pub fn front_mut(&mut self) -> Option<&mut T> {
485        match self.cursor {
486            Some(Cursor { offset, position }) => {
487                let bucket = self.stored_buckets.get_mut(offset).unwrap();
488                let State::Loaded { data } = &mut bucket.state else {
489                    unreachable!();
490                };
491                Some(data.get_mut(position).unwrap())
492            }
493            None => self.new_back_values.front_mut(),
494        }
495    }
496
497    /// Deletes the front value, if any.
498    /// ```rust
499    /// # tokio_test::block_on(async {
500    /// # use linera_views::context::MemoryContext;
501    /// # use linera_views::bucket_queue_view::BucketQueueView;
502    /// # use crate::linera_views::views::View;
503    /// # let context = MemoryContext::new_for_testing(());
504    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
505    /// queue.push_back(34 as u128);
506    /// queue.delete_front().await.unwrap();
507    /// assert_eq!(queue.elements().await.unwrap(), Vec::<u128>::new());
508    /// # })
509    /// ```
510    pub async fn delete_front(&mut self) -> Result<(), ViewError> {
511        match self.cursor {
512            Some(cursor) => {
513                let mut offset = cursor.offset;
514                let mut position = cursor.position + 1;
515                if self.stored_buckets[offset].len() == position {
516                    offset += 1;
517                    position = 0;
518                }
519                if offset == self.stored_buckets.len() {
520                    self.cursor = None;
521                } else {
522                    self.cursor = Some(Cursor { offset, position });
523                    let bucket = self.stored_buckets.get_mut(offset).unwrap();
524                    let index = bucket.index;
525                    if !bucket.is_loaded() {
526                        let key = self.get_bucket_key(index)?;
527                        let data = self.context.store().read_value(&key).await?;
528                        let data = match data {
529                            Some(value) => value,
530                            None => {
531                                return Err(ViewError::MissingEntries(
532                                    "BucketQueueView::delete_front".into(),
533                                ));
534                            }
535                        };
536                        self.stored_buckets[offset].state = State::Loaded { data };
537                    }
538                }
539            }
540            None => {
541                self.new_back_values.pop_front();
542            }
543        }
544        Ok(())
545    }
546
547    /// Pushes a value to the end of the queue.
548    /// ```rust
549    /// # tokio_test::block_on(async {
550    /// # use linera_views::context::MemoryContext;
551    /// # use linera_views::bucket_queue_view::BucketQueueView;
552    /// # use crate::linera_views::views::View;
553    /// # let context = MemoryContext::new_for_testing(());
554    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
555    /// queue.push_back(34);
556    /// assert_eq!(queue.elements().await.unwrap(), vec![34]);
557    /// # })
558    /// ```
559    pub fn push_back(&mut self, value: T) {
560        self.new_back_values.push_back(value);
561    }
562
563    /// Returns the list of elements in the queue.
564    /// ```rust
565    /// # tokio_test::block_on(async {
566    /// # use linera_views::context::MemoryContext;
567    /// # use linera_views::bucket_queue_view::BucketQueueView;
568    /// # use crate::linera_views::views::View;
569    /// # let context = MemoryContext::new_for_testing(());
570    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
571    /// queue.push_back(34);
572    /// queue.push_back(37);
573    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
574    /// # })
575    /// ```
576    pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
577        let count = self.count();
578        self.read_context(self.cursor, count).await
579    }
580
581    /// Returns the last element of a bucket queue view
582    /// ```rust
583    /// # tokio_test::block_on(async {
584    /// # use linera_views::context::MemoryContext;
585    /// # use linera_views::bucket_queue_view::BucketQueueView;
586    /// # use crate::linera_views::views::View;
587    /// # let context = MemoryContext::new_for_testing(());
588    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
589    /// queue.push_back(34);
590    /// queue.push_back(37);
591    /// assert_eq!(queue.back().await.unwrap(), Some(37));
592    /// # })
593    /// ```
594    pub async fn back(&mut self) -> Result<Option<T>, ViewError>
595    where
596        T: Clone,
597    {
598        if let Some(value) = self.new_back_values.back() {
599            return Ok(Some(value.clone()));
600        }
601        if self.cursor.is_none() {
602            return Ok(None);
603        }
604        let Some(bucket) = self.stored_buckets.back() else {
605            return Ok(None);
606        };
607        if !bucket.is_loaded() {
608            let key = self.get_bucket_key(bucket.index)?;
609            let data = self.context.store().read_value(&key).await?;
610            let data = match data {
611                Some(data) => data,
612                None => {
613                    return Err(ViewError::MissingEntries("BucketQueueView::back".into()));
614                }
615            };
616            self.stored_buckets.back_mut().unwrap().state = State::Loaded { data };
617        }
618        let state = &self.stored_buckets.back_mut().unwrap().state;
619        let State::Loaded { data } = state else {
620            unreachable!();
621        };
622        Ok(Some(data.last().unwrap().clone()))
623    }
624
625    async fn read_context(
626        &self,
627        cursor: Option<Cursor>,
628        count: usize,
629    ) -> Result<Vec<T>, ViewError> {
630        if count == 0 {
631            return Ok(Vec::new());
632        }
633        let mut elements = Vec::<T>::new();
634        let mut count_remain = count;
635        if let Some(cursor) = cursor {
636            let mut keys = Vec::new();
637            let mut position = cursor.position;
638            for offset in cursor.offset..self.stored_buckets.len() {
639                let bucket = &self.stored_buckets[offset];
640                let size = bucket.len() - position;
641                if !bucket.is_loaded() {
642                    let key = self.get_bucket_key(bucket.index)?;
643                    keys.push(key);
644                };
645                if size >= count_remain {
646                    break;
647                }
648                count_remain -= size;
649                position = 0;
650            }
651            let values = self.context.store().read_multi_values_bytes(&keys).await?;
652            let mut value_pos = 0;
653            count_remain = count;
654            let mut position = cursor.position;
655            for offset in cursor.offset..self.stored_buckets.len() {
656                let bucket = &self.stored_buckets[offset];
657                let size = bucket.len() - position;
658                let data = match &bucket.state {
659                    State::Loaded { data } => data,
660                    State::NotLoaded { .. } => {
661                        let value = match &values[value_pos] {
662                            Some(value) => value,
663                            None => {
664                                return Err(ViewError::MissingEntries(
665                                    "BucketQueueView::read_context".into(),
666                                ));
667                            }
668                        };
669                        value_pos += 1;
670                        &bcs::from_bytes::<Vec<T>>(value)?
671                    }
672                };
673                elements.extend(data[position..].iter().take(count_remain).cloned());
674                if size >= count_remain {
675                    return Ok(elements);
676                }
677                count_remain -= size;
678                position = 0;
679            }
680        }
681        let count_read = std::cmp::min(count_remain, self.new_back_values.len());
682        elements.extend(self.new_back_values.range(0..count_read).cloned());
683        Ok(elements)
684    }
685
686    /// Returns the first elements of a bucket queue view
687    /// ```rust
688    /// # tokio_test::block_on(async {
689    /// # use linera_views::context::MemoryContext;
690    /// # use linera_views::bucket_queue_view::BucketQueueView;
691    /// # use crate::linera_views::views::View;
692    /// # let context = MemoryContext::new_for_testing(());
693    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
694    /// queue.push_back(34);
695    /// queue.push_back(37);
696    /// queue.push_back(47);
697    /// assert_eq!(queue.read_front(2).await.unwrap(), vec![34, 37]);
698    /// # })
699    /// ```
700    pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
701        let count = std::cmp::min(count, self.count());
702        self.read_context(self.cursor, count).await
703    }
704
705    /// Returns the last element of a bucket queue view
706    /// ```rust
707    /// # tokio_test::block_on(async {
708    /// # use linera_views::context::MemoryContext;
709    /// # use linera_views::bucket_queue_view::BucketQueueView;
710    /// # use crate::linera_views::views::View;
711    /// # let context = MemoryContext::new_for_testing(());
712    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
713    /// queue.push_back(34);
714    /// queue.push_back(37);
715    /// queue.push_back(47);
716    /// assert_eq!(queue.read_back(2).await.unwrap(), vec![37, 47]);
717    /// # })
718    /// ```
719    pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
720        let count = std::cmp::min(count, self.count());
721        if count <= self.new_back_values.len() {
722            let start = self.new_back_values.len() - count;
723            Ok(self
724                .new_back_values
725                .range(start..)
726                .cloned()
727                .collect::<Vec<_>>())
728        } else {
729            let mut increment = self.count() - count;
730            let Some(cursor) = self.cursor else {
731                unreachable!();
732            };
733            let mut position = cursor.position;
734            for offset in cursor.offset..self.stored_buckets.len() {
735                let size = self.stored_buckets[offset].len() - position;
736                if increment < size {
737                    return self
738                        .read_context(
739                            Some(Cursor {
740                                offset,
741                                position: position + increment,
742                            }),
743                            count,
744                        )
745                        .await;
746                }
747                increment -= size;
748                position = 0;
749            }
750            unreachable!();
751        }
752    }
753
754    async fn load_all(&mut self) -> Result<(), ViewError> {
755        if !self.delete_storage_first {
756            let elements = self.elements().await?;
757            self.new_back_values.clear();
758            for elt in elements {
759                self.new_back_values.push_back(elt);
760            }
761            self.cursor = None;
762            self.delete_storage_first = true;
763        }
764        Ok(())
765    }
766
767    /// Gets a mutable iterator on the entries of the queue
768    /// ```rust
769    /// # tokio_test::block_on(async {
770    /// # use linera_views::context::MemoryContext;
771    /// # use linera_views::bucket_queue_view::BucketQueueView;
772    /// # use linera_views::views::View;
773    /// # let context = MemoryContext::new_for_testing(());
774    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
775    /// queue.push_back(34);
776    /// let mut iter = queue.iter_mut().await.unwrap();
777    /// let value = iter.next().unwrap();
778    /// *value = 42;
779    /// assert_eq!(queue.elements().await.unwrap(), vec![42]);
780    /// # })
781    /// ```
782    pub async fn iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
783        self.load_all().await?;
784        Ok(self.new_back_values.iter_mut())
785    }
786}
787
788impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
789    for BucketQueueView<C, T, N>
790where
791    Self: View,
792{
793    type Hasher = sha3::Sha3_256;
794
795    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
796        self.hash().await
797    }
798
799    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
800        #[cfg(with_metrics)]
801        let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
802        let elements = self.elements().await?;
803        let mut hasher = sha3::Sha3_256::default();
804        hasher.update_with_bcs_bytes(&elements)?;
805        Ok(hasher.finalize())
806    }
807}
808
809/// Type wrapping `QueueView` while memoizing the hash.
810pub type HashedBucketQueueView<C, T, const N: usize> =
811    WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
812
813/// Wrapper around `BucketQueueView` to compute hashes based on the history of changes.
814pub type HistoricallyHashedBucketQueueView<C, T, const N: usize> =
815    HistoricallyHashableView<C, BucketQueueView<C, T, N>>;
816
817#[cfg(with_graphql)]
818mod graphql {
819    use std::borrow::Cow;
820
821    use super::BucketQueueView;
822    use crate::{
823        context::Context,
824        graphql::{hash_name, mangle},
825    };
826
827    impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
828        for BucketQueueView<C, T, N>
829    {
830        fn type_name() -> Cow<'static, str> {
831            format!(
832                "BucketQueueView_{}_{:08x}",
833                mangle(T::type_name()),
834                hash_name::<T>()
835            )
836            .into()
837        }
838    }
839
840    #[async_graphql::Object(cache_control(no_cache), name_type)]
841    impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
842    where
843        C: Send + Sync,
844        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
845    {
846        #[graphql(derived(name = "count"))]
847        async fn count_(&self) -> Result<u32, async_graphql::Error> {
848            Ok(self.count() as u32)
849        }
850
851        async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
852            Ok(self
853                .read_front(count.unwrap_or_else(|| self.count()))
854                .await?)
855        }
856    }
857}