linera_views/views/
bucket_queue_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{vec_deque::IterMut, VecDeque};
5
6#[cfg(with_metrics)]
7use linera_base::prometheus_util::MeasureLatency as _;
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9
10use crate::{
11    batch::Batch,
12    common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
13    context::Context,
14    hashable_wrapper::WrappedHashableContainerView,
15    historical_hash_wrapper::HistoricallyHashableView,
16    store::ReadableKeyValueStore as _,
17    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
18};
19
20#[cfg(with_metrics)]
21mod metrics {
22    use std::sync::LazyLock;
23
24    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
25    use prometheus::HistogramVec;
26
27    /// The runtime of hash computation
28    pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
29        register_histogram_vec(
30            "bucket_queue_view_hash_runtime",
31            "BucketQueueView hash runtime",
32            &[],
33            exponential_bucket_latencies(5.0),
34        )
35    });
36}
37
38/// Key tags to create the sub-keys of a [`BucketQueueView`] on top of the base key.
39/// * The Front is special and downloaded at the view loading.
40/// * The Store is where the structure of the buckets is stored.
41/// * The Index is for storing the specific buckets.
42#[repr(u8)]
43enum KeyTag {
44    /// Prefix for the front of the view
45    Front = MIN_VIEW_TAG,
46    /// Prefix for [`StoredIndices`]
47    Store,
48    /// Prefix for the indices of the log.
49    Index,
50}
51
52/// The `StoredIndices` contains the description of the stored buckets.
53#[derive(Clone, Debug, Default, Serialize, Deserialize)]
54struct StoredIndices {
55    /// The stored buckets with the first index being the size (at most N) and the
56    /// second one is the index in the storage. If the index is 0 then it corresponds
57    /// with the first value (entry `KeyTag::Front`), otherwise to the keys with
58    /// prefix `KeyTag::Index`.
59    indices: Vec<(usize, usize)>,
60    /// The position of the front in the first index.
61    position: usize,
62}
63
64impl StoredIndices {
65    fn len(&self) -> usize {
66        self.indices.len()
67    }
68}
69
70/// The `Cursor` is the current position of the front in the queue.
71/// If position is None, then all the stored entries have been deleted
72/// and the `new_back_values` are the ones accessed by the front operation.
73/// If position is not trivial, then the first index is the relevant
74/// bucket for the front and the second index is the position in the index.
75#[derive(Clone, Debug)]
76struct Cursor {
77    position: Option<(usize, usize)>,
78}
79
80impl Cursor {
81    pub fn new(number_bucket: usize, position: usize) -> Self {
82        if number_bucket == 0 {
83            Cursor { position: None }
84        } else {
85            Cursor {
86                position: Some((0, position)),
87            }
88        }
89    }
90
91    pub fn is_incrementable(&self) -> bool {
92        self.position.is_some()
93    }
94}
95
96#[derive(Clone, Debug)]
97enum Bucket<T> {
98    Loaded { data: Vec<T> },
99    NotLoaded { length: usize },
100}
101
102impl<T> Bucket<T> {
103    fn len(&self) -> usize {
104        match self {
105            Bucket::Loaded { data } => data.len(),
106            Bucket::NotLoaded { length } => *length,
107        }
108    }
109
110    fn is_loaded(&self) -> bool {
111        match self {
112            Bucket::Loaded { .. } => true,
113            Bucket::NotLoaded { .. } => false,
114        }
115    }
116}
117
118fn stored_indices<T>(stored_data: &VecDeque<(usize, Bucket<T>)>, position: usize) -> StoredIndices {
119    let indices = stored_data
120        .iter()
121        .map(|(index, bucket)| (bucket.len(), *index))
122        .collect::<Vec<_>>();
123    StoredIndices { indices, position }
124}
125
126/// A view that supports a FIFO queue for values of type `T`.
127/// The size `N` has to be chosen by taking into account the size of the type `T`
128/// and the basic size of a block. For example a total size of 100 bytes to 10 KB
129/// seems adequate.
130#[derive(Debug)]
131pub struct BucketQueueView<C, T, const N: usize> {
132    context: C,
133    /// The buckets of stored data. If missing, then it has not been loaded. The first index is always loaded.
134    stored_data: VecDeque<(usize, Bucket<T>)>,
135    /// The newly inserted back values.
136    new_back_values: VecDeque<T>,
137    /// The stored position for the data
138    stored_position: usize,
139    /// The current position in the `stored_data`.
140    cursor: Cursor,
141    /// Whether the storage is deleted or not.
142    delete_storage_first: bool,
143}
144
145impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
146where
147    C: Context,
148    T: Send + Sync + Clone + Serialize + DeserializeOwned,
149{
150    const NUM_INIT_KEYS: usize = 2;
151
152    type Context = C;
153
154    fn context(&self) -> &C {
155        &self.context
156    }
157
158    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
159        let key1 = context.base_key().base_tag(KeyTag::Front as u8);
160        let key2 = context.base_key().base_tag(KeyTag::Store as u8);
161        Ok(vec![key1, key2])
162    }
163
164    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
165        let value1 = values.first().ok_or(ViewError::PostLoadValuesError)?;
166        let value2 = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
167        let front = from_bytes_option::<Vec<T>>(value1)?;
168        let mut stored_data = VecDeque::from(match front {
169            Some(front) => {
170                vec![(0, Bucket::Loaded { data: front })]
171            }
172            None => {
173                vec![]
174            }
175        });
176        let stored_indices = from_bytes_option_or_default::<StoredIndices>(value2)?;
177        for i in 1..stored_indices.len() {
178            let length = stored_indices.indices[i].0;
179            let index = stored_indices.indices[i].1;
180            stored_data.push_back((index, Bucket::NotLoaded { length }));
181        }
182        let cursor = Cursor::new(stored_indices.len(), stored_indices.position);
183        Ok(Self {
184            context,
185            stored_data,
186            stored_position: stored_indices.position,
187            new_back_values: VecDeque::new(),
188            cursor,
189            delete_storage_first: false,
190        })
191    }
192
193    fn rollback(&mut self) {
194        self.delete_storage_first = false;
195        self.cursor = Cursor::new(self.stored_data.len(), self.stored_position);
196        self.new_back_values.clear();
197    }
198
199    async fn has_pending_changes(&self) -> bool {
200        if self.delete_storage_first {
201            return true;
202        }
203        if !self.stored_data.is_empty() {
204            let Some((i_block, position)) = self.cursor.position else {
205                return true;
206            };
207            if i_block != 0 || position != self.stored_position {
208                return true;
209            }
210        }
211        !self.new_back_values.is_empty()
212    }
213
214    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
215        let mut delete_view = false;
216        if self.delete_storage_first {
217            let key_prefix = self.context.base_key().bytes.clone();
218            batch.delete_key_prefix(key_prefix);
219            delete_view = true;
220        }
221        if self.stored_count() == 0 {
222            let key_prefix = self.context.base_key().bytes.clone();
223            batch.delete_key_prefix(key_prefix);
224            self.stored_data.clear();
225            self.stored_position = 0;
226        } else if let Some((i_block, position)) = self.cursor.position {
227            for _ in 0..i_block {
228                let block = self.stored_data.pop_front().unwrap();
229                let index = block.0;
230                let key = self.get_index_key(index)?;
231                batch.delete_key(key);
232            }
233            self.cursor = Cursor {
234                position: Some((0, position)),
235            };
236            self.stored_position = position;
237            // We need to ensure that the first index is in the front.
238            let first_index = self.stored_data[0].0;
239            if first_index != 0 {
240                self.stored_data[0].0 = 0;
241                let key = self.get_index_key(first_index)?;
242                batch.delete_key(key);
243                let key = self.get_index_key(0)?;
244                let (_, data0) = self.stored_data.front().unwrap();
245                let Bucket::Loaded { data } = data0 else {
246                    unreachable!();
247                };
248                batch.put_key_value(key, &data)?;
249            }
250        }
251        if !self.new_back_values.is_empty() {
252            delete_view = false;
253            let mut unused_index = match self.stored_data.back() {
254                Some((index, _)) => index + 1,
255                None => 0,
256            };
257            let new_back_values = std::mem::take(&mut self.new_back_values);
258            let new_back_values = new_back_values.into_iter().collect::<Vec<_>>();
259            for value_chunk in new_back_values.chunks(N) {
260                let key = self.get_index_key(unused_index)?;
261                batch.put_key_value(key, &value_chunk)?;
262                self.stored_data.push_back((
263                    unused_index,
264                    Bucket::Loaded {
265                        data: value_chunk.to_vec(),
266                    },
267                ));
268                unused_index += 1;
269            }
270            if !self.cursor.is_incrementable() {
271                self.cursor = Cursor {
272                    position: Some((0, 0)),
273                }
274            }
275        }
276        if !self.delete_storage_first || !self.stored_data.is_empty() {
277            let stored_indices = stored_indices(&self.stored_data, self.stored_position);
278            let key = self.context.base_key().base_tag(KeyTag::Store as u8);
279            batch.put_key_value(key, &stored_indices)?;
280        }
281        self.delete_storage_first = false;
282        Ok(delete_view)
283    }
284
285    fn clear(&mut self) {
286        self.delete_storage_first = true;
287        self.new_back_values.clear();
288        self.cursor.position = None;
289    }
290}
291
292impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
293where
294    Self: View,
295{
296    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
297        Ok(BucketQueueView {
298            context: self.context.clone(),
299            stored_data: self.stored_data.clone(),
300            new_back_values: self.new_back_values.clone(),
301            stored_position: self.stored_position,
302            cursor: self.cursor.clone(),
303            delete_storage_first: self.delete_storage_first,
304        })
305    }
306}
307
308impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
309    /// Gets the key corresponding to the index
310    fn get_index_key(&self, index: usize) -> Result<Vec<u8>, ViewError> {
311        Ok(if index == 0 {
312            self.context.base_key().base_tag(KeyTag::Front as u8)
313        } else {
314            self.context
315                .base_key()
316                .derive_tag_key(KeyTag::Index as u8, &index)?
317        })
318    }
319
320    /// Gets the number of entries in the container that are stored
321    /// ```rust
322    /// # tokio_test::block_on(async {
323    /// # use linera_views::context::MemoryContext;
324    /// # use linera_views::bucket_queue_view::BucketQueueView;
325    /// # use crate::linera_views::views::View;
326    /// # let context = MemoryContext::new_for_testing(());
327    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
328    /// queue.push_back(34);
329    /// assert_eq!(queue.stored_count(), 0);
330    /// # })
331    /// ```
332    pub fn stored_count(&self) -> usize {
333        if self.delete_storage_first {
334            0
335        } else {
336            let Some((i_block, position)) = self.cursor.position else {
337                return 0;
338            };
339            let mut stored_count = 0;
340            for block in i_block..self.stored_data.len() {
341                stored_count += self.stored_data[block].1.len();
342            }
343            stored_count -= position;
344            stored_count
345        }
346    }
347
348    /// The total number of entries of the container
349    /// ```rust
350    /// # tokio_test::block_on(async {
351    /// # use linera_views::context::MemoryContext;
352    /// # use linera_views::bucket_queue_view::BucketQueueView;
353    /// # use crate::linera_views::views::View;
354    /// # let context = MemoryContext::new_for_testing(());
355    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
356    /// queue.push_back(34);
357    /// assert_eq!(queue.count(), 1);
358    /// # })
359    /// ```
360    pub fn count(&self) -> usize {
361        self.stored_count() + self.new_back_values.len()
362    }
363}
364
365impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
366    /// Gets a reference on the front value if any.
367    /// ```rust
368    /// # tokio_test::block_on(async {
369    /// # use linera_views::context::MemoryContext;
370    /// # use linera_views::bucket_queue_view::BucketQueueView;
371    /// # use crate::linera_views::views::View;
372    /// # let context = MemoryContext::new_for_testing(());
373    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
374    /// queue.push_back(34);
375    /// queue.push_back(42);
376    /// assert_eq!(queue.front().cloned(), Some(34));
377    /// # })
378    /// ```
379    pub fn front(&self) -> Option<&T> {
380        match self.cursor.position {
381            Some((i_block, position)) => {
382                let block = &self.stored_data[i_block].1;
383                let Bucket::Loaded { data } = block else {
384                    unreachable!();
385                };
386                Some(&data[position])
387            }
388            None => self.new_back_values.front(),
389        }
390    }
391
392    /// Reads the front value, if any.
393    /// ```rust
394    /// # tokio_test::block_on(async {
395    /// # use linera_views::context::MemoryContext;
396    /// # use linera_views::bucket_queue_view::BucketQueueView;
397    /// # use crate::linera_views::views::View;
398    /// # let context = MemoryContext::new_for_testing(());
399    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
400    /// queue.push_back(34);
401    /// queue.push_back(42);
402    /// let front = queue.front_mut().unwrap();
403    /// *front = 43;
404    /// assert_eq!(queue.front().cloned(), Some(43));
405    /// # })
406    /// ```
407    pub fn front_mut(&mut self) -> Option<&mut T> {
408        match self.cursor.position {
409            Some((i_block, position)) => {
410                let block = &mut self.stored_data.get_mut(i_block).unwrap().1;
411                let Bucket::Loaded { data } = block else {
412                    unreachable!();
413                };
414                Some(data.get_mut(position).unwrap())
415            }
416            None => self.new_back_values.front_mut(),
417        }
418    }
419
420    /// Deletes the front value, if any.
421    /// ```rust
422    /// # tokio_test::block_on(async {
423    /// # use linera_views::context::MemoryContext;
424    /// # use linera_views::bucket_queue_view::BucketQueueView;
425    /// # use crate::linera_views::views::View;
426    /// # let context = MemoryContext::new_for_testing(());
427    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
428    /// queue.push_back(34 as u128);
429    /// queue.delete_front().await.unwrap();
430    /// assert_eq!(queue.elements().await.unwrap(), Vec::<u128>::new());
431    /// # })
432    /// ```
433    pub async fn delete_front(&mut self) -> Result<(), ViewError> {
434        match self.cursor.position {
435            Some((mut i_block, mut position)) => {
436                position += 1;
437                if self.stored_data[i_block].1.len() == position {
438                    i_block += 1;
439                    position = 0;
440                }
441                if i_block == self.stored_data.len() {
442                    self.cursor = Cursor { position: None };
443                } else {
444                    self.cursor = Cursor {
445                        position: Some((i_block, position)),
446                    };
447                    let (index, bucket) = self.stored_data.get_mut(i_block).unwrap();
448                    let index = *index;
449                    if !bucket.is_loaded() {
450                        let key = self.get_index_key(index)?;
451                        let value = self.context.store().read_value_bytes(&key).await?;
452                        let value = value.ok_or(ViewError::MissingEntries)?;
453                        let data = bcs::from_bytes(&value)?;
454                        self.stored_data[i_block].1 = Bucket::Loaded { data };
455                    }
456                }
457            }
458            None => {
459                self.new_back_values.pop_front();
460            }
461        }
462        Ok(())
463    }
464
465    /// Pushes a value to the end of the queue.
466    /// ```rust
467    /// # tokio_test::block_on(async {
468    /// # use linera_views::context::MemoryContext;
469    /// # use linera_views::bucket_queue_view::BucketQueueView;
470    /// # use crate::linera_views::views::View;
471    /// # let context = MemoryContext::new_for_testing(());
472    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
473    /// queue.push_back(34);
474    /// assert_eq!(queue.elements().await.unwrap(), vec![34]);
475    /// # })
476    /// ```
477    pub fn push_back(&mut self, value: T) {
478        self.new_back_values.push_back(value);
479    }
480
481    /// Returns the list of elements in the queue.
482    /// ```rust
483    /// # tokio_test::block_on(async {
484    /// # use linera_views::context::MemoryContext;
485    /// # use linera_views::bucket_queue_view::BucketQueueView;
486    /// # use crate::linera_views::views::View;
487    /// # let context = MemoryContext::new_for_testing(());
488    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
489    /// queue.push_back(34);
490    /// queue.push_back(37);
491    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
492    /// # })
493    /// ```
494    pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
495        let count = self.count();
496        self.read_context(self.cursor.position, count).await
497    }
498
499    /// Returns the last element of a bucket queue view
500    /// ```rust
501    /// # tokio_test::block_on(async {
502    /// # use linera_views::context::MemoryContext;
503    /// # use linera_views::bucket_queue_view::BucketQueueView;
504    /// # use crate::linera_views::views::View;
505    /// # let context = MemoryContext::new_for_testing(());
506    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
507    /// queue.push_back(34);
508    /// queue.push_back(37);
509    /// assert_eq!(queue.back().await.unwrap(), Some(37));
510    /// # })
511    /// ```
512    pub async fn back(&mut self) -> Result<Option<T>, ViewError>
513    where
514        T: Clone,
515    {
516        if let Some(value) = self.new_back_values.back() {
517            return Ok(Some(value.clone()));
518        }
519        if self.cursor.position.is_none() {
520            return Ok(None);
521        }
522        let Some((index, bucket)) = self.stored_data.back() else {
523            return Ok(None);
524        };
525        if !bucket.is_loaded() {
526            let key = self.get_index_key(*index)?;
527            let value = self.context.store().read_value_bytes(&key).await?;
528            let value = value.as_ref().ok_or(ViewError::MissingEntries)?;
529            let data = bcs::from_bytes::<Vec<T>>(value)?;
530            self.stored_data.back_mut().unwrap().1 = Bucket::Loaded { data };
531        }
532        let bucket = &self.stored_data.back_mut().unwrap().1;
533        let Bucket::Loaded { data } = bucket else {
534            unreachable!();
535        };
536        Ok(Some(data.last().unwrap().clone()))
537    }
538
539    async fn read_context(
540        &self,
541        position: Option<(usize, usize)>,
542        count: usize,
543    ) -> Result<Vec<T>, ViewError> {
544        if count == 0 {
545            return Ok(Vec::new());
546        }
547        let mut elements = Vec::<T>::new();
548        let mut count_remain = count;
549        if let Some(pair) = position {
550            let mut keys = Vec::new();
551            let (i_block, mut position) = pair;
552            for block in i_block..self.stored_data.len() {
553                let (index, bucket) = &self.stored_data[block];
554                let size = bucket.len() - position;
555                if !bucket.is_loaded() {
556                    let key = self.get_index_key(*index)?;
557                    keys.push(key);
558                };
559                if size >= count_remain {
560                    break;
561                }
562                count_remain -= size;
563                position = 0;
564            }
565            let values = self.context.store().read_multi_values_bytes(keys).await?;
566            position = pair.1;
567            let mut value_pos = 0;
568            count_remain = count;
569            for block in i_block..self.stored_data.len() {
570                let bucket = &self.stored_data[block].1;
571                let size = bucket.len() - position;
572                let vec = match bucket {
573                    Bucket::Loaded { data } => data,
574                    Bucket::NotLoaded { .. } => {
575                        let value = values[value_pos]
576                            .as_ref()
577                            .ok_or(ViewError::MissingEntries)?;
578                        value_pos += 1;
579                        &bcs::from_bytes::<Vec<T>>(value)?
580                    }
581                };
582                elements.extend(vec[position..].iter().take(count_remain).cloned());
583                if size >= count_remain {
584                    return Ok(elements);
585                }
586                count_remain -= size;
587                position = 0;
588            }
589        }
590        let count_read = std::cmp::min(count_remain, self.new_back_values.len());
591        elements.extend(self.new_back_values.range(0..count_read).cloned());
592        Ok(elements)
593    }
594
595    /// Returns the first elements of a bucket queue view
596    /// ```rust
597    /// # tokio_test::block_on(async {
598    /// # use linera_views::context::MemoryContext;
599    /// # use linera_views::bucket_queue_view::BucketQueueView;
600    /// # use crate::linera_views::views::View;
601    /// # let context = MemoryContext::new_for_testing(());
602    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
603    /// queue.push_back(34);
604    /// queue.push_back(37);
605    /// queue.push_back(47);
606    /// assert_eq!(queue.read_front(2).await.unwrap(), vec![34, 37]);
607    /// # })
608    /// ```
609    pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
610        let count = std::cmp::min(count, self.count());
611        self.read_context(self.cursor.position, count).await
612    }
613
614    /// Returns the last element of a bucket queue view
615    /// ```rust
616    /// # tokio_test::block_on(async {
617    /// # use linera_views::context::MemoryContext;
618    /// # use linera_views::bucket_queue_view::BucketQueueView;
619    /// # use crate::linera_views::views::View;
620    /// # let context = MemoryContext::new_for_testing(());
621    /// let mut queue = BucketQueueView::<_, u128, 5>::load(context).await.unwrap();
622    /// queue.push_back(34);
623    /// queue.push_back(37);
624    /// queue.push_back(47);
625    /// assert_eq!(queue.read_back(2).await.unwrap(), vec![37, 47]);
626    /// # })
627    /// ```
628    pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
629        let count = std::cmp::min(count, self.count());
630        if count <= self.new_back_values.len() {
631            let start = self.new_back_values.len() - count;
632            Ok(self
633                .new_back_values
634                .range(start..)
635                .cloned()
636                .collect::<Vec<_>>())
637        } else {
638            let mut increment = self.count() - count;
639            let Some((i_block, mut position)) = self.cursor.position else {
640                unreachable!();
641            };
642            for block in i_block..self.stored_data.len() {
643                let size = self.stored_data[block].1.len() - position;
644                if increment < size {
645                    return self
646                        .read_context(Some((block, position + increment)), count)
647                        .await;
648                }
649                increment -= size;
650                position = 0;
651            }
652            unreachable!();
653        }
654    }
655
656    async fn load_all(&mut self) -> Result<(), ViewError> {
657        if !self.delete_storage_first {
658            let elements = self.elements().await?;
659            self.new_back_values.clear();
660            for elt in elements {
661                self.new_back_values.push_back(elt);
662            }
663            self.cursor = Cursor { position: None };
664            self.delete_storage_first = true;
665        }
666        Ok(())
667    }
668
669    /// Gets a mutable iterator on the entries of the queue
670    /// ```rust
671    /// # tokio_test::block_on(async {
672    /// # use linera_views::context::MemoryContext;
673    /// # use linera_views::bucket_queue_view::BucketQueueView;
674    /// # use linera_views::views::View;
675    /// # let context = MemoryContext::new_for_testing(());
676    /// let mut queue = BucketQueueView::<_, u8, 5>::load(context).await.unwrap();
677    /// queue.push_back(34);
678    /// let mut iter = queue.iter_mut().await.unwrap();
679    /// let value = iter.next().unwrap();
680    /// *value = 42;
681    /// assert_eq!(queue.elements().await.unwrap(), vec![42]);
682    /// # })
683    /// ```
684    pub async fn iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
685        self.load_all().await?;
686        Ok(self.new_back_values.iter_mut())
687    }
688}
689
690impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
691    for BucketQueueView<C, T, N>
692where
693    Self: View,
694{
695    type Hasher = sha3::Sha3_256;
696
697    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
698        self.hash().await
699    }
700
701    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
702        #[cfg(with_metrics)]
703        let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
704        let elements = self.elements().await?;
705        let mut hasher = sha3::Sha3_256::default();
706        hasher.update_with_bcs_bytes(&elements)?;
707        Ok(hasher.finalize())
708    }
709}
710
711/// Type wrapping `QueueView` while memoizing the hash.
712pub type HashedBucketQueueView<C, T, const N: usize> =
713    WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
714
715/// Wrapper around `BucketQueueView` to compute hashes based on the history of changes.
716pub type HistoricallyHashedBucketQueueView<C, T, const N: usize> =
717    HistoricallyHashableView<C, BucketQueueView<C, T, N>>;
718
719#[cfg(with_graphql)]
720mod graphql {
721    use std::borrow::Cow;
722
723    use super::BucketQueueView;
724    use crate::{
725        context::Context,
726        graphql::{hash_name, mangle},
727    };
728
729    impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
730        for BucketQueueView<C, T, N>
731    {
732        fn type_name() -> Cow<'static, str> {
733            format!(
734                "BucketQueueView_{}_{:08x}",
735                mangle(T::type_name()),
736                hash_name::<T>()
737            )
738            .into()
739        }
740    }
741
742    #[async_graphql::Object(cache_control(no_cache), name_type)]
743    impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
744    where
745        C: Send + Sync,
746        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
747    {
748        #[graphql(derived(name = "count"))]
749        async fn count_(&self) -> Result<u32, async_graphql::Error> {
750            Ok(self.count() as u32)
751        }
752
753        async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
754            Ok(self
755                .read_front(count.unwrap_or_else(|| self.count()))
756                .await?)
757        }
758    }
759}