Skip to main content

linera_views/views/
queue_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{vec_deque::IterMut, VecDeque},
6    ops::Range,
7};
8
9use allocative::Allocative;
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{data_types::ArithmeticError, visit_allocative_simple};
13use serde::{de::DeserializeOwned, Serialize};
14
15use crate::{
16    batch::Batch,
17    common::{from_bytes_option_or_default, HasherOutput},
18    context::Context,
19    hashable_wrapper::WrappedHashableContainerView,
20    historical_hash_wrapper::HistoricallyHashableView,
21    store::ReadableKeyValueStore as _,
22    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
23};
24
25#[cfg(with_metrics)]
26mod metrics {
27    use std::sync::LazyLock;
28
29    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
30    use prometheus::HistogramVec;
31
32    /// The runtime of hash computation
33    pub static QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
34        register_histogram_vec(
35            "queue_view_hash_runtime",
36            "QueueView hash runtime",
37            &[],
38            exponential_bucket_latencies(5.0),
39        )
40    });
41}
42
43/// Key tags to create the sub-keys of a `QueueView` on top of the base key.
44#[repr(u8)]
45enum KeyTag {
46    /// Prefix for the storing of the variable `stored_indices`.
47    Store = MIN_VIEW_TAG,
48    /// Prefix for the indices of the log.
49    Index,
50}
51
52/// A view that supports a FIFO queue for values of type `T`.
53#[derive(Debug, Allocative)]
54#[allocative(bound = "C, T: Allocative")]
55pub struct QueueView<C, T> {
56    /// The view context.
57    #[allocative(skip)]
58    context: C,
59    /// The range of indices for entries persisted in storage.
60    #[allocative(visit = visit_allocative_simple)]
61    stored_indices: Range<u32>,
62    /// The number of entries to delete from the front.
63    front_delete_count: u32,
64    /// Whether to clear storage before applying updates.
65    delete_storage_first: bool,
66    /// New values added to the back, not yet persisted to storage.
67    new_back_values: VecDeque<T>,
68}
69
70impl<C, T> View for QueueView<C, T>
71where
72    C: Context,
73    T: Serialize + Send + Sync,
74{
75    const NUM_INIT_KEYS: usize = 1;
76
77    type Context = C;
78
79    fn context(&self) -> C {
80        self.context.clone()
81    }
82
83    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
84        Ok(vec![context.base_key().base_tag(KeyTag::Store as u8)])
85    }
86
87    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
88        let stored_indices =
89            from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
90        Ok(Self {
91            context,
92            stored_indices,
93            front_delete_count: 0,
94            delete_storage_first: false,
95            new_back_values: VecDeque::new(),
96        })
97    }
98
99    fn rollback(&mut self) {
100        self.delete_storage_first = false;
101        self.front_delete_count = 0;
102        self.new_back_values.clear();
103    }
104
105    async fn has_pending_changes(&self) -> bool {
106        if self.delete_storage_first {
107            return true;
108        }
109        if self.front_delete_count > 0 {
110            return true;
111        }
112        !self.new_back_values.is_empty()
113    }
114
115    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
116        let mut delete_view = false;
117        if self.delete_storage_first {
118            batch.delete_key_prefix(self.context.base_key().bytes.clone());
119            delete_view = true;
120        }
121        let mut new_stored_indices = self.stored_indices.clone();
122        if self.stored_count() == 0 {
123            let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
124            batch.delete_key_prefix(key_prefix);
125            new_stored_indices = Range::default();
126        } else if self.front_delete_count > 0 {
127            let deletion_end = new_stored_indices.start + self.front_delete_count;
128            for index in new_stored_indices.start..deletion_end {
129                let key = self
130                    .context
131                    .base_key()
132                    .derive_tag_key(KeyTag::Index as u8, &index)?;
133                batch.delete_key(key);
134            }
135            new_stored_indices.start = deletion_end;
136        }
137        if !self.new_back_values.is_empty() {
138            delete_view = false;
139            let new_back_len =
140                u32::try_from(self.new_back_values.len()).map_err(|_| ArithmeticError::Overflow)?;
141            new_stored_indices.end = new_stored_indices
142                .end
143                .checked_add(new_back_len)
144                .ok_or(ArithmeticError::Overflow)?;
145            let start = new_stored_indices.end - new_back_len;
146            for (index, value) in (start..).zip(&self.new_back_values) {
147                let key = self
148                    .context
149                    .base_key()
150                    .derive_tag_key(KeyTag::Index as u8, &index)?;
151                batch.put_key_value(key, value)?;
152            }
153        }
154        if !self.delete_storage_first || !new_stored_indices.is_empty() {
155            let key = self.context.base_key().base_tag(KeyTag::Store as u8);
156            batch.put_key_value(key, &new_stored_indices)?;
157        }
158        Ok(delete_view)
159    }
160
161    fn post_save(&mut self) {
162        if self.stored_count() == 0 {
163            self.stored_indices = Range::default();
164        } else if self.front_delete_count > 0 {
165            self.stored_indices.start += self.front_delete_count;
166        }
167        if !self.new_back_values.is_empty() {
168            self.stored_indices.end +=
169                u32::try_from(self.new_back_values.len()).expect("verified in pre_save");
170            self.new_back_values.clear();
171        }
172        self.front_delete_count = 0;
173        self.delete_storage_first = false;
174    }
175
176    fn clear(&mut self) {
177        self.delete_storage_first = true;
178        self.new_back_values.clear();
179    }
180}
181
182impl<C, T> ClonableView for QueueView<C, T>
183where
184    C: Context,
185    T: Clone + Send + Sync + Serialize,
186{
187    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
188        Ok(QueueView {
189            context: self.context.clone(),
190            stored_indices: self.stored_indices.clone(),
191            front_delete_count: self.front_delete_count,
192            delete_storage_first: self.delete_storage_first,
193            new_back_values: self.new_back_values.clone(),
194        })
195    }
196}
197
198impl<C, T> QueueView<C, T> {
199    fn stored_count(&self) -> u32 {
200        if self.delete_storage_first {
201            0
202        } else {
203            (self.stored_indices.end - self.stored_indices.start) - self.front_delete_count
204        }
205    }
206}
207
208impl<'a, C, T> QueueView<C, T>
209where
210    C: Context,
211    T: Send + Sync + Clone + Serialize + DeserializeOwned,
212{
213    async fn get(&self, index: u32) -> Result<Option<T>, ViewError> {
214        let key = self
215            .context
216            .base_key()
217            .derive_tag_key(KeyTag::Index as u8, &index)?;
218        Ok(self.context.store().read_value(&key).await?)
219    }
220
221    /// Reads the front value, if any.
222    /// ```rust
223    /// # tokio_test::block_on(async {
224    /// # use linera_views::context::MemoryContext;
225    /// # use linera_views::queue_view::QueueView;
226    /// # use linera_views::views::View;
227    /// # let context = MemoryContext::new_for_testing(());
228    /// let mut queue = QueueView::load(context).await.unwrap();
229    /// queue.push_back(34);
230    /// queue.push_back(42);
231    /// assert_eq!(queue.front().await.unwrap(), Some(34));
232    /// # })
233    /// ```
234    pub async fn front(&self) -> Result<Option<T>, ViewError> {
235        let stored_remainder = self.stored_count();
236        let value = if stored_remainder > 0 {
237            self.get(self.stored_indices.end - stored_remainder).await?
238        } else {
239            self.new_back_values.front().cloned()
240        };
241        Ok(value)
242    }
243
244    /// Reads the back value, if any.
245    /// ```rust
246    /// # tokio_test::block_on(async {
247    /// # use linera_views::context::MemoryContext;
248    /// # use linera_views::queue_view::QueueView;
249    /// # use linera_views::views::View;
250    /// # let context = MemoryContext::new_for_testing(());
251    /// let mut queue = QueueView::load(context).await.unwrap();
252    /// queue.push_back(34);
253    /// queue.push_back(42);
254    /// assert_eq!(queue.back().await.unwrap(), Some(42));
255    /// # })
256    /// ```
257    pub async fn back(&self) -> Result<Option<T>, ViewError> {
258        Ok(match self.new_back_values.back() {
259            Some(value) => Some(value.clone()),
260            None if self.stored_count() > 0 => self.get(self.stored_indices.end - 1).await?,
261            _ => None,
262        })
263    }
264
265    /// Deletes the front value, if any.
266    /// ```rust
267    /// # tokio_test::block_on(async {
268    /// # use linera_views::context::MemoryContext;
269    /// # use linera_views::queue_view::QueueView;
270    /// # use linera_views::views::View;
271    /// # let context = MemoryContext::new_for_testing(());
272    /// let mut queue = QueueView::load(context).await.unwrap();
273    /// queue.push_back(34 as u128);
274    /// queue.delete_front();
275    /// assert_eq!(queue.elements().await.unwrap(), Vec::<u128>::new());
276    /// # })
277    /// ```
278    pub fn delete_front(&mut self) {
279        if self.stored_count() > 0 {
280            self.front_delete_count += 1;
281        } else {
282            self.new_back_values.pop_front();
283        }
284    }
285
286    /// Pushes a value to the end of the queue.
287    /// ```rust
288    /// # tokio_test::block_on(async {
289    /// # use linera_views::context::MemoryContext;
290    /// # use linera_views::queue_view::QueueView;
291    /// # use linera_views::views::View;
292    /// # let context = MemoryContext::new_for_testing(());
293    /// let mut queue = QueueView::load(context).await.unwrap();
294    /// queue.push_back(34);
295    /// queue.push_back(37);
296    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
297    /// # })
298    /// ```
299    pub fn push_back(&mut self, value: T) {
300        self.new_back_values.push_back(value);
301    }
302
303    /// Reads the size of the queue.
304    /// ```rust
305    /// # tokio_test::block_on(async {
306    /// # use linera_views::context::MemoryContext;
307    /// # use linera_views::queue_view::QueueView;
308    /// # use linera_views::views::View;
309    /// # let context = MemoryContext::new_for_testing(());
310    /// let mut queue = QueueView::load(context).await.unwrap();
311    /// queue.push_back(34);
312    /// assert_eq!(queue.count(), 1);
313    /// # })
314    /// ```
315    pub fn count(&self) -> usize {
316        self.stored_count() as usize + self.new_back_values.len()
317    }
318
319    /// Obtains the extra data.
320    pub fn extra(&self) -> &C::Extra {
321        self.context.extra()
322    }
323
324    async fn read_context(&self, range: Range<u32>) -> Result<Vec<T>, ViewError> {
325        let count = range.len();
326        let mut keys = Vec::with_capacity(count);
327        for index in range {
328            let key = self
329                .context
330                .base_key()
331                .derive_tag_key(KeyTag::Index as u8, &index)?;
332            keys.push(key)
333        }
334        let mut values = Vec::with_capacity(count);
335        for entry in self.context.store().read_multi_values(&keys).await? {
336            match entry {
337                None => {
338                    return Err(ViewError::MissingEntries("QueueView".into()));
339                }
340                Some(value) => values.push(value),
341            }
342        }
343        Ok(values)
344    }
345
346    /// Reads the `count` next values in the queue (including staged ones).
347    /// ```rust
348    /// # tokio_test::block_on(async {
349    /// # use linera_views::context::MemoryContext;
350    /// # use linera_views::queue_view::QueueView;
351    /// # use linera_views::views::View;
352    /// # let context = MemoryContext::new_for_testing(());
353    /// let mut queue = QueueView::load(context).await.unwrap();
354    /// queue.push_back(34);
355    /// queue.push_back(42);
356    /// assert_eq!(queue.read_front(1).await.unwrap(), vec![34]);
357    /// # })
358    /// ```
359    pub async fn read_front(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
360        if count > self.count() {
361            count = self.count();
362        }
363        if count == 0 {
364            return Ok(Vec::new());
365        }
366        let mut values = Vec::with_capacity(count);
367        if !self.delete_storage_first {
368            let stored_remainder = self.stored_count();
369            let start = self.stored_indices.end - stored_remainder;
370            if count <= stored_remainder as usize {
371                let count = u32::try_from(count).map_err(|_| ArithmeticError::Overflow)?;
372                values.extend(self.read_context(start..start + count).await?);
373            } else {
374                values.extend(self.read_context(start..self.stored_indices.end).await?);
375                values.extend(
376                    self.new_back_values
377                        .range(0..count - stored_remainder as usize)
378                        .cloned(),
379                );
380            }
381        } else {
382            values.extend(self.new_back_values.range(0..count).cloned());
383        }
384        Ok(values)
385    }
386
387    /// Reads the `count` last values in the queue (including staged ones).
388    /// ```rust
389    /// # tokio_test::block_on(async {
390    /// # use linera_views::context::MemoryContext;
391    /// # use linera_views::queue_view::QueueView;
392    /// # use linera_views::views::View;
393    /// # let context = MemoryContext::new_for_testing(());
394    /// let mut queue = QueueView::load(context).await.unwrap();
395    /// queue.push_back(34);
396    /// queue.push_back(42);
397    /// assert_eq!(queue.read_back(1).await.unwrap(), vec![42]);
398    /// # })
399    /// ```
400    pub async fn read_back(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
401        if count > self.count() {
402            count = self.count();
403        }
404        if count == 0 {
405            return Ok(Vec::new());
406        }
407        let mut values = Vec::with_capacity(count);
408        let new_back_len = self.new_back_values.len();
409        if count <= new_back_len || self.delete_storage_first {
410            values.extend(
411                self.new_back_values
412                    .range((new_back_len - count)..new_back_len)
413                    .cloned(),
414            );
415        } else {
416            let stored_consumed =
417                u32::try_from(count - new_back_len).map_err(|_| ArithmeticError::Underflow)?;
418            let start = self.stored_indices.end - stored_consumed;
419            values.extend(self.read_context(start..self.stored_indices.end).await?);
420            values.extend(self.new_back_values.iter().cloned());
421        }
422        Ok(values)
423    }
424
425    /// Reads all the elements
426    /// ```rust
427    /// # tokio_test::block_on(async {
428    /// # use linera_views::context::MemoryContext;
429    /// # use linera_views::queue_view::QueueView;
430    /// # use linera_views::views::View;
431    /// # let context = MemoryContext::new_for_testing(());
432    /// let mut queue = QueueView::load(context).await.unwrap();
433    /// queue.push_back(34);
434    /// queue.push_back(37);
435    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
436    /// # })
437    /// ```
438    pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
439        let count = self.count();
440        self.read_front(count).await
441    }
442
443    async fn load_all(&mut self) -> Result<(), ViewError> {
444        if !self.delete_storage_first {
445            let stored_remainder = self.stored_count();
446            let start = self.stored_indices.end - stored_remainder;
447            let elements = self.read_context(start..self.stored_indices.end).await?;
448            for elt in elements {
449                self.new_back_values.push_back(elt);
450            }
451            self.new_back_values.rotate_right(stored_remainder as usize);
452            // All indices are being deleted at the next flush. This is because they are deleted either:
453            // * Because a self.front_delete_count forces them to be removed
454            // * Or because loading them means that their value can be changed which invalidates
455            //   the entries on storage
456            self.delete_storage_first = true;
457        }
458        Ok(())
459    }
460
461    /// Gets a mutable iterator on the entries of the queue
462    /// ```rust
463    /// # tokio_test::block_on(async {
464    /// # use linera_views::context::MemoryContext;
465    /// # use linera_views::queue_view::QueueView;
466    /// # use linera_views::views::View;
467    /// # let context = MemoryContext::new_for_testing(());
468    /// let mut queue = QueueView::load(context).await.unwrap();
469    /// queue.push_back(34);
470    /// let mut iter = queue.try_iter_mut().await.unwrap();
471    /// let value = iter.next().unwrap();
472    /// *value = 42;
473    /// assert_eq!(queue.elements().await.unwrap(), vec![42]);
474    /// # })
475    /// ```
476    pub async fn try_iter_mut(&'a mut self) -> Result<IterMut<'a, T>, ViewError> {
477        self.load_all().await?;
478        Ok(self.new_back_values.iter_mut())
479    }
480}
481
482impl<C, T> HashableView for QueueView<C, T>
483where
484    C: Context,
485    T: Send + Sync + Clone + Serialize + DeserializeOwned,
486{
487    type Hasher = sha3::Sha3_256;
488
489    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
490        self.hash().await
491    }
492
493    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
494        #[cfg(with_metrics)]
495        let _hash_latency = metrics::QUEUE_VIEW_HASH_RUNTIME.measure_latency();
496        let elements = self.elements().await?;
497        let mut hasher = sha3::Sha3_256::default();
498        hasher.update_with_bcs_bytes(&elements)?;
499        Ok(hasher.finalize())
500    }
501}
502
503/// Type wrapping `QueueView` while memoizing the hash.
504pub type HashedQueueView<C, T> = WrappedHashableContainerView<C, QueueView<C, T>, HasherOutput>;
505
506/// Wrapper around `QueueView` to compute hashes based on the history of changes.
507pub type HistoricallyHashedQueueView<C, T> = HistoricallyHashableView<C, QueueView<C, T>>;
508
509#[cfg(with_graphql)]
510mod graphql {
511    use std::borrow::Cow;
512
513    use linera_base::data_types::ArithmeticError;
514
515    use super::QueueView;
516    use crate::{
517        context::Context,
518        graphql::{hash_name, mangle},
519    };
520
521    impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for QueueView<C, T> {
522        fn type_name() -> Cow<'static, str> {
523            format!(
524                "QueueView_{}_{:08x}",
525                mangle(T::type_name()),
526                hash_name::<T>()
527            )
528            .into()
529        }
530    }
531
532    #[async_graphql::Object(cache_control(no_cache), name_type)]
533    impl<C: Context, T: async_graphql::OutputType> QueueView<C, T>
534    where
535        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
536    {
537        #[graphql(derived(name = "count"))]
538        async fn count_(&self) -> Result<u32, async_graphql::Error> {
539            Ok(u32::try_from(self.count()).map_err(|_| ArithmeticError::Overflow)?)
540        }
541
542        async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
543            Ok(self
544                .read_front(count.unwrap_or_else(|| self.count()))
545                .await?)
546        }
547    }
548}