linera_views/views/
bucket_queue_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{vec_deque::IterMut, VecDeque};
5
6#[cfg(with_metrics)]
7use linera_base::prometheus_util::MeasureLatency as _;
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9
10use crate::{
11    batch::Batch,
12    common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
13    context::Context,
14    hashable_wrapper::WrappedHashableContainerView,
15    store::ReadableKeyValueStore as _,
16    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
17};
18
19#[cfg(with_metrics)]
20mod metrics {
21    use std::sync::LazyLock;
22
23    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
24    use prometheus::HistogramVec;
25
26    /// The runtime of hash computation
27    pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
28        register_histogram_vec(
29            "bucket_queue_view_hash_runtime",
30            "BucketQueueView hash runtime",
31            &[],
32            exponential_bucket_latencies(5.0),
33        )
34    });
35}
36
37/// Key tags to create the sub-keys of a [`BucketQueueView`] on top of the base key.
38/// * The Front is special and downloaded at the view loading.
39/// * The Store is where the structure of the buckets is stored.
40/// * The Index is for storing the specific buckets.
41#[repr(u8)]
42enum KeyTag {
43    /// Prefix for the front of the view
44    Front = MIN_VIEW_TAG,
45    /// Prefix for [`StoredIndices`]
46    Store,
47    /// Prefix for the indices of the log.
48    Index,
49}
50
51/// The `StoredIndices` contains the description of the stored buckets.
52#[derive(Clone, Debug, Default, Serialize, Deserialize)]
53struct StoredIndices {
54    /// The stored buckets with the first index being the size (at most N) and the
55    /// second one is the index in the storage. If the index is 0 then it corresponds
56    /// with the first value (entry `KeyTag::Front`), otherwise to the keys with
57    /// prefix `KeyTag::Index`.
58    indices: Vec<(usize, usize)>,
59    /// The position of the front in the first index.
60    position: usize,
61}
62
63impl StoredIndices {
64    fn len(&self) -> usize {
65        self.indices.len()
66    }
67}
68
69/// The `Cursor` is the current position of the front in the queue.
70/// If position is None, then all the stored entries have been deleted
71/// and the `new_back_values` are the ones accessed by the front operation.
72/// If position is not trivial, then the first index is the relevant
73/// bucket for the front and the second index is the position in the index.
74#[derive(Clone, Debug)]
75struct Cursor {
76    position: Option<(usize, usize)>,
77}
78
79impl Cursor {
80    pub fn new(number_bucket: usize, position: usize) -> Self {
81        if number_bucket == 0 {
82            Cursor { position: None }
83        } else {
84            Cursor {
85                position: Some((0, position)),
86            }
87        }
88    }
89
90    pub fn is_incrementable(&self) -> bool {
91        self.position.is_some()
92    }
93}
94
95#[derive(Clone, Debug)]
96enum Bucket<T> {
97    Loaded { data: Vec<T> },
98    NotLoaded { length: usize },
99}
100
101impl<T> Bucket<T> {
102    fn len(&self) -> usize {
103        match self {
104            Bucket::Loaded { data } => data.len(),
105            Bucket::NotLoaded { length } => *length,
106        }
107    }
108
109    fn is_loaded(&self) -> bool {
110        match self {
111            Bucket::Loaded { .. } => true,
112            Bucket::NotLoaded { .. } => false,
113        }
114    }
115}
116
117fn stored_indices<T>(stored_data: &VecDeque<(usize, Bucket<T>)>, position: usize) -> StoredIndices {
118    let indices = stored_data
119        .iter()
120        .map(|(index, bucket)| (bucket.len(), *index))
121        .collect::<Vec<_>>();
122    StoredIndices { indices, position }
123}
124
125/// A view that supports a FIFO queue for values of type `T`.
126/// The size `N` has to be chosen by taking into account the size of the type `T`
127/// and the basic size of a block. For example a total size of 100 bytes to 10 KB
128/// seems adequate.
129#[derive(Debug)]
130pub struct BucketQueueView<C, T, const N: usize> {
131    context: C,
132    /// The buckets of stored data. If missing, then it has not been loaded. The first index is always loaded.
133    stored_data: VecDeque<(usize, Bucket<T>)>,
134    /// The newly inserted back values.
135    new_back_values: VecDeque<T>,
136    /// The stored position for the data
137    stored_position: usize,
138    /// The current position in the `stored_data`.
139    cursor: Cursor,
140    /// Whether the storage is deleted or not.
141    delete_storage_first: bool,
142}
143
144impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
145where
146    C: Context,
147    T: Send + Sync + Clone + Serialize + DeserializeOwned,
148{
149    const NUM_INIT_KEYS: usize = 2;
150
151    type Context = C;
152
153    fn context(&self) -> &C {
154        &self.context
155    }
156
157    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
158        let key1 = context.base_key().base_tag(KeyTag::Front as u8);
159        let key2 = context.base_key().base_tag(KeyTag::Store as u8);
160        Ok(vec![key1, key2])
161    }
162
163    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
164        let value1 = values.first().ok_or(ViewError::PostLoadValuesError)?;
165        let value2 = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
166        let front = from_bytes_option::<Vec<T>>(value1)?;
167        let mut stored_data = VecDeque::from(match front {
168            Some(front) => {
169                vec![(0, Bucket::Loaded { data: front })]
170            }
171            None => {
172                vec![]
173            }
174        });
175        let stored_indices = from_bytes_option_or_default::<StoredIndices>(value2)?;
176        for i in 1..stored_indices.len() {
177            let length = stored_indices.indices[i].0;
178            let index = stored_indices.indices[i].1;
179            stored_data.push_back((index, Bucket::NotLoaded { length }));
180        }
181        let cursor = Cursor::new(stored_indices.len(), stored_indices.position);
182        Ok(Self {
183            context,
184            stored_data,
185            stored_position: stored_indices.position,
186            new_back_values: VecDeque::new(),
187            cursor,
188            delete_storage_first: false,
189        })
190    }
191
192    async fn load(context: C) -> Result<Self, ViewError> {
193        let keys = Self::pre_load(&context)?;
194        let values = context.store().read_multi_values_bytes(keys).await?;
195        Self::post_load(context, &values)
196    }
197
198    fn rollback(&mut self) {
199        self.delete_storage_first = false;
200        self.cursor = Cursor::new(self.stored_data.len(), self.stored_position);
201        self.new_back_values.clear();
202    }
203
204    async fn has_pending_changes(&self) -> bool {
205        if self.delete_storage_first {
206            return true;
207        }
208        if !self.stored_data.is_empty() {
209            let Some((i_block, position)) = self.cursor.position else {
210                return true;
211            };
212            if i_block != 0 || position != self.stored_position {
213                return true;
214            }
215        }
216        !self.new_back_values.is_empty()
217    }
218
219    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
220        let mut delete_view = false;
221        if self.delete_storage_first {
222            let key_prefix = self.context.base_key().bytes.clone();
223            batch.delete_key_prefix(key_prefix);
224            delete_view = true;
225        }
226        if self.stored_count() == 0 {
227            let key_prefix = self.context.base_key().bytes.clone();
228            batch.delete_key_prefix(key_prefix);
229            self.stored_data.clear();
230            self.stored_position = 0;
231        } else if let Some((i_block, position)) = self.cursor.position {
232            for _ in 0..i_block {
233                let block = self.stored_data.pop_front().unwrap();
234                let index = block.0;
235                let key = self.get_index_key(index)?;
236                batch.delete_key(key);
237            }
238            self.cursor = Cursor {
239                position: Some((0, position)),
240            };
241            self.stored_position = position;
242            // We need to ensure that the first index is in the front.
243            let first_index = self.stored_data[0].0;
244            if first_index != 0 {
245                self.stored_data[0].0 = 0;
246                let key = self.get_index_key(first_index)?;
247                batch.delete_key(key);
248                let key = self.get_index_key(0)?;
249                let (_, data0) = self.stored_data.front().unwrap();
250                let Bucket::Loaded { data } = data0 else {
251                    unreachable!();
252                };
253                batch.put_key_value(key, &data)?;
254            }
255        }
256        if !self.new_back_values.is_empty() {
257            delete_view = false;
258            let mut unused_index = match self.stored_data.back() {
259                Some((index, _)) => index + 1,
260                None => 0,
261            };
262            let new_back_values = std::mem::take(&mut self.new_back_values);
263            let new_back_values = new_back_values.into_iter().collect::<Vec<_>>();
264            for value_chunk in new_back_values.chunks(N) {
265                let key = self.get_index_key(unused_index)?;
266                batch.put_key_value(key, &value_chunk)?;
267                self.stored_data.push_back((
268                    unused_index,
269                    Bucket::Loaded {
270                        data: value_chunk.to_vec(),
271                    },
272                ));
273                unused_index += 1;
274            }
275            if !self.cursor.is_incrementable() {
276                self.cursor = Cursor {
277                    position: Some((0, 0)),
278                }
279            }
280        }
281        if !self.delete_storage_first || !self.stored_data.is_empty() {
282            let stored_indices = stored_indices(&self.stored_data, self.stored_position);
283            let key = self.context.base_key().base_tag(KeyTag::Store as u8);
284            batch.put_key_value(key, &stored_indices)?;
285        }
286        self.delete_storage_first = false;
287        Ok(delete_view)
288    }
289
290    fn clear(&mut self) {
291        self.delete_storage_first = true;
292        self.new_back_values.clear();
293        self.cursor.position = None;
294    }
295}
296
297impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
298where
299    Self: View,
300{
301    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
302        Ok(BucketQueueView {
303            context: self.context.clone(),
304            stored_data: self.stored_data.clone(),
305            new_back_values: self.new_back_values.clone(),
306            stored_position: self.stored_position,
307            cursor: self.cursor.clone(),
308            delete_storage_first: self.delete_storage_first,
309        })
310    }
311}
312
313impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
314    /// Gets the key corresponding to the index
315    fn get_index_key(&self, index: usize) -> Result<Vec<u8>, ViewError> {
316        Ok(if index == 0 {
317            self.context.base_key().base_tag(KeyTag::Front as u8)
318        } else {
319            self.context
320                .base_key()
321                .derive_tag_key(KeyTag::Index as u8, &index)?
322        })
323    }
324
325    /// Gets the number of entries in the container that are stored
326    /// ```rust
327    /// # tokio_test::block_on(async {
328    /// # use linera_views::context::MemoryContext;
329    /// # use linera_views::bucket_queue_view::BucketQueueView;
330    /// # use crate::linera_views::views::View;
331    /// # let context = MemoryContext::new_for_testing(());
332    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
333    /// queue.push_back(34);
334    /// assert_eq!(queue.stored_count(), 0);
335    /// # })
336    /// ```
337    pub fn stored_count(&self) -> usize {
338        if self.delete_storage_first {
339            0
340        } else {
341            let Some((i_block, position)) = self.cursor.position else {
342                return 0;
343            };
344            let mut stored_count = 0;
345            for block in i_block..self.stored_data.len() {
346                stored_count += self.stored_data[block].1.len();
347            }
348            stored_count -= position;
349            stored_count
350        }
351    }
352
353    /// The total number of entries of the container
354    /// ```rust
355    /// # tokio_test::block_on(async {
356    /// # use linera_views::context::MemoryContext;
357    /// # use linera_views::bucket_queue_view::BucketQueueView;
358    /// # use crate::linera_views::views::View;
359    /// # let context = MemoryContext::new_for_testing(());
360    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
361    /// queue.push_back(34);
362    /// assert_eq!(queue.count(), 1);
363    /// # })
364    /// ```
365    pub fn count(&self) -> usize {
366        self.stored_count() + self.new_back_values.len()
367    }
368}
369
370impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
371    /// Gets a reference on the front value if any.
372    /// ```rust
373    /// # tokio_test::block_on(async {
374    /// # use linera_views::context::MemoryContext;
375    /// # use linera_views::bucket_queue_view::BucketQueueView;
376    /// # use crate::linera_views::views::View;
377    /// # let context = MemoryContext::new_for_testing(());
378    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
379    /// queue.push_back(34);
380    /// queue.push_back(42);
381    /// assert_eq!(queue.front().cloned(), Some(34));
382    /// # })
383    /// ```
384    pub fn front(&self) -> Option<&T> {
385        match self.cursor.position {
386            Some((i_block, position)) => {
387                let block = &self.stored_data[i_block].1;
388                let Bucket::Loaded { data } = block else {
389                    unreachable!();
390                };
391                Some(&data[position])
392            }
393            None => self.new_back_values.front(),
394        }
395    }
396
397    /// Reads the front value, if any.
398    /// ```rust
399    /// # tokio_test::block_on(async {
400    /// # use linera_views::context::MemoryContext;
401    /// # use linera_views::bucket_queue_view::BucketQueueView;
402    /// # use crate::linera_views::views::View;
403    /// # let context = MemoryContext::new_for_testing(());
404    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
405    /// queue.push_back(34);
406    /// queue.push_back(42);
407    /// let front = queue.front_mut().unwrap();
408    /// *front = 43;
409    /// assert_eq!(queue.front().cloned(), Some(43));
410    /// # })
411    /// ```
412    pub fn front_mut(&mut self) -> Option<&mut T> {
413        match self.cursor.position {
414            Some((i_block, position)) => {
415                let block = &mut self.stored_data.get_mut(i_block).unwrap().1;
416                let Bucket::Loaded { data } = block else {
417                    unreachable!();
418                };
419                Some(data.get_mut(position).unwrap())
420            }
421            None => self.new_back_values.front_mut(),
422        }
423    }
424
425    /// Deletes the front value, if any.
426    /// ```rust
427    /// # tokio_test::block_on(async {
428    /// # use linera_views::context::MemoryContext;
429    /// # use linera_views::bucket_queue_view::BucketQueueView;
430    /// # use crate::linera_views::views::View;
431    /// # let context = MemoryContext::new_for_testing(());
432    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
433    /// queue.push_back(34 as u128);
434    /// queue.delete_front().await.unwrap();
435    /// assert_eq!(queue.elements().await.unwrap(), Vec::<u128>::new());
436    /// # })
437    /// ```
438    pub async fn delete_front(&mut self) -> Result<(), ViewError> {
439        match self.cursor.position {
440            Some((mut i_block, mut position)) => {
441                position += 1;
442                if self.stored_data[i_block].1.len() == position {
443                    i_block += 1;
444                    position = 0;
445                }
446                if i_block == self.stored_data.len() {
447                    self.cursor = Cursor { position: None };
448                } else {
449                    self.cursor = Cursor {
450                        position: Some((i_block, position)),
451                    };
452                    let (index, bucket) = self.stored_data.get_mut(i_block).unwrap();
453                    let index = *index;
454                    if !bucket.is_loaded() {
455                        let key = self.get_index_key(index)?;
456                        let value = self.context.store().read_value_bytes(&key).await?;
457                        let value = value.ok_or(ViewError::MissingEntries)?;
458                        let data = bcs::from_bytes(&value)?;
459                        self.stored_data[i_block].1 = Bucket::Loaded { data };
460                    }
461                }
462            }
463            None => {
464                self.new_back_values.pop_front();
465            }
466        }
467        Ok(())
468    }
469
470    /// Pushes a value to the end of the queue.
471    /// ```rust
472    /// # tokio_test::block_on(async {
473    /// # use linera_views::context::MemoryContext;
474    /// # use linera_views::bucket_queue_view::BucketQueueView;
475    /// # use crate::linera_views::views::View;
476    /// # let context = MemoryContext::new_for_testing(());
477    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
478    /// queue.push_back(34);
479    /// assert_eq!(queue.elements().await.unwrap(), vec![34]);
480    /// # })
481    /// ```
482    pub fn push_back(&mut self, value: T) {
483        self.new_back_values.push_back(value);
484    }
485
486    /// Returns the list of elements in the queue.
487    /// ```rust
488    /// # tokio_test::block_on(async {
489    /// # use linera_views::context::MemoryContext;
490    /// # use linera_views::bucket_queue_view::BucketQueueView;
491    /// # use crate::linera_views::views::View;
492    /// # let context = MemoryContext::new_for_testing(());
493    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
494    /// queue.push_back(34);
495    /// queue.push_back(37);
496    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
497    /// # })
498    /// ```
499    pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
500        let count = self.count();
501        self.read_context(self.cursor.position, count).await
502    }
503
504    /// Returns the last element of a bucket queue view
505    /// ```rust
506    /// # tokio_test::block_on(async {
507    /// # use linera_views::context::MemoryContext;
508    /// # use linera_views::bucket_queue_view::BucketQueueView;
509    /// # use crate::linera_views::views::View;
510    /// # let context = MemoryContext::new_for_testing(());
511    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
512    /// queue.push_back(34);
513    /// queue.push_back(37);
514    /// assert_eq!(queue.back().await.unwrap(), Some(37));
515    /// # })
516    /// ```
517    pub async fn back(&mut self) -> Result<Option<T>, ViewError>
518    where
519        T: Clone,
520    {
521        if let Some(value) = self.new_back_values.back() {
522            return Ok(Some(value.clone()));
523        }
524        if self.cursor.position.is_none() {
525            return Ok(None);
526        }
527        let Some((index, bucket)) = self.stored_data.back() else {
528            return Ok(None);
529        };
530        if !bucket.is_loaded() {
531            let key = self.get_index_key(*index)?;
532            let value = self.context.store().read_value_bytes(&key).await?;
533            let value = value.as_ref().ok_or(ViewError::MissingEntries)?;
534            let data = bcs::from_bytes::<Vec<T>>(value)?;
535            self.stored_data.back_mut().unwrap().1 = Bucket::Loaded { data };
536        }
537        let bucket = &self.stored_data.back_mut().unwrap().1;
538        let Bucket::Loaded { data } = bucket else {
539            unreachable!();
540        };
541        Ok(Some(data.last().unwrap().clone()))
542    }
543
544    async fn read_context(
545        &self,
546        position: Option<(usize, usize)>,
547        count: usize,
548    ) -> Result<Vec<T>, ViewError> {
549        if count == 0 {
550            return Ok(Vec::new());
551        }
552        let mut elements = Vec::<T>::new();
553        let mut count_remain = count;
554        if let Some(pair) = position {
555            let mut keys = Vec::new();
556            let (i_block, mut position) = pair;
557            for block in i_block..self.stored_data.len() {
558                let (index, bucket) = &self.stored_data[block];
559                let size = bucket.len() - position;
560                if !bucket.is_loaded() {
561                    let key = self.get_index_key(*index)?;
562                    keys.push(key);
563                };
564                if size >= count_remain {
565                    break;
566                }
567                count_remain -= size;
568                position = 0;
569            }
570            let values = self.context.store().read_multi_values_bytes(keys).await?;
571            position = pair.1;
572            let mut value_pos = 0;
573            count_remain = count;
574            for block in i_block..self.stored_data.len() {
575                let bucket = &self.stored_data[block].1;
576                let size = bucket.len() - position;
577                let vec = match bucket {
578                    Bucket::Loaded { data } => data,
579                    Bucket::NotLoaded { .. } => {
580                        let value = values[value_pos]
581                            .as_ref()
582                            .ok_or(ViewError::MissingEntries)?;
583                        value_pos += 1;
584                        &bcs::from_bytes::<Vec<T>>(value)?
585                    }
586                };
587                elements.extend(vec[position..].iter().take(count_remain).cloned());
588                if size >= count_remain {
589                    return Ok(elements);
590                }
591                count_remain -= size;
592                position = 0;
593            }
594        }
595        let count_read = std::cmp::min(count_remain, self.new_back_values.len());
596        elements.extend(self.new_back_values.range(0..count_read).cloned());
597        Ok(elements)
598    }
599
600    /// Returns the first elements of a bucket queue view
601    /// ```rust
602    /// # tokio_test::block_on(async {
603    /// # use linera_views::context::MemoryContext;
604    /// # use linera_views::bucket_queue_view::BucketQueueView;
605    /// # use crate::linera_views::views::View;
606    /// # let context = MemoryContext::new_for_testing(());
607    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
608    /// queue.push_back(34);
609    /// queue.push_back(37);
610    /// queue.push_back(47);
611    /// assert_eq!(queue.read_front(2).await.unwrap(), vec![34, 37]);
612    /// # })
613    /// ```
614    pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
615        let count = std::cmp::min(count, self.count());
616        self.read_context(self.cursor.position, count).await
617    }
618
619    /// Returns the last element of a bucket queue view
620    /// ```rust
621    /// # tokio_test::block_on(async {
622    /// # use linera_views::context::MemoryContext;
623    /// # use linera_views::bucket_queue_view::BucketQueueView;
624    /// # use crate::linera_views::views::View;
625    /// # let context = MemoryContext::new_for_testing(());
626    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
627    /// queue.push_back(34);
628    /// queue.push_back(37);
629    /// queue.push_back(47);
630    /// assert_eq!(queue.read_back(2).await.unwrap(), vec![37, 47]);
631    /// # })
632    /// ```
633    pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
634        let count = std::cmp::min(count, self.count());
635        if count <= self.new_back_values.len() {
636            let start = self.new_back_values.len() - count;
637            Ok(self
638                .new_back_values
639                .range(start..)
640                .cloned()
641                .collect::<Vec<_>>())
642        } else {
643            let mut increment = self.count() - count;
644            let Some((i_block, mut position)) = self.cursor.position else {
645                unreachable!();
646            };
647            for block in i_block..self.stored_data.len() {
648                let size = self.stored_data[block].1.len() - position;
649                if increment < size {
650                    return self
651                        .read_context(Some((block, position + increment)), count)
652                        .await;
653                }
654                increment -= size;
655                position = 0;
656            }
657            unreachable!();
658        }
659    }
660
661    async fn load_all(&mut self) -> Result<(), ViewError> {
662        if !self.delete_storage_first {
663            let elements = self.elements().await?;
664            self.new_back_values.clear();
665            for elt in elements {
666                self.new_back_values.push_back(elt);
667            }
668            self.cursor = Cursor { position: None };
669            self.delete_storage_first = true;
670        }
671        Ok(())
672    }
673
674    /// Gets a mutable iterator on the entries of the queue
675    /// ```rust
676    /// # tokio_test::block_on(async {
677    /// # use linera_views::context::MemoryContext;
678    /// # use linera_views::bucket_queue_view::BucketQueueView;
679    /// # use linera_views::views::View;
680    /// # let context = MemoryContext::new_for_testing(());
681    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
682    /// queue.push_back(34);
683    /// let mut iter = queue.iter_mut().await.unwrap();
684    /// let value = iter.next().unwrap();
685    /// *value = 42;
686    /// assert_eq!(queue.elements().await.unwrap(), vec![42]);
687    /// # })
688    /// ```
689    pub async fn iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
690        self.load_all().await?;
691        Ok(self.new_back_values.iter_mut())
692    }
693}
694
695impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
696    for BucketQueueView<C, T, N>
697where
698    Self: View,
699{
700    type Hasher = sha3::Sha3_256;
701
702    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
703        self.hash().await
704    }
705
706    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
707        #[cfg(with_metrics)]
708        let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
709        let elements = self.elements().await?;
710        let mut hasher = sha3::Sha3_256::default();
711        hasher.update_with_bcs_bytes(&elements)?;
712        Ok(hasher.finalize())
713    }
714}
715
716/// Type wrapping `QueueView` while memoizing the hash.
717pub type HashedBucketQueueView<C, T, const N: usize> =
718    WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
719
720#[cfg(with_graphql)]
721mod graphql {
722    use std::borrow::Cow;
723
724    use super::BucketQueueView;
725    use crate::{
726        context::Context,
727        graphql::{hash_name, mangle},
728    };
729
730    impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
731        for BucketQueueView<C, T, N>
732    {
733        fn type_name() -> Cow<'static, str> {
734            format!(
735                "BucketQueueView_{}_{:08x}",
736                mangle(T::type_name()),
737                hash_name::<T>()
738            )
739            .into()
740        }
741    }
742
743    #[async_graphql::Object(cache_control(no_cache), name_type)]
744    impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
745    where
746        C: Send + Sync,
747        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
748    {
749        async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
750            Ok(self
751                .read_front(count.unwrap_or_else(|| self.count()))
752                .await?)
753        }
754    }
755}