linera_views/views/
queue_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{vec_deque::IterMut, VecDeque},
6    ops::Range,
7};
8
9use allocative::Allocative;
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::visit_allocative_simple;
13use serde::{de::DeserializeOwned, Serialize};
14
15use crate::{
16    batch::Batch,
17    common::{from_bytes_option_or_default, HasherOutput},
18    context::Context,
19    hashable_wrapper::WrappedHashableContainerView,
20    historical_hash_wrapper::HistoricallyHashableView,
21    store::ReadableKeyValueStore as _,
22    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
23};
24
25#[cfg(with_metrics)]
26mod metrics {
27    use std::sync::LazyLock;
28
29    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
30    use prometheus::HistogramVec;
31
32    /// The runtime of hash computation
33    pub static QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
34        register_histogram_vec(
35            "queue_view_hash_runtime",
36            "QueueView hash runtime",
37            &[],
38            exponential_bucket_latencies(5.0),
39        )
40    });
41}
42
43/// Key tags to create the sub-keys of a `QueueView` on top of the base key.
44#[repr(u8)]
45enum KeyTag {
46    /// Prefix for the storing of the variable `stored_indices`.
47    Store = MIN_VIEW_TAG,
48    /// Prefix for the indices of the log.
49    Index,
50}
51
52/// A view that supports a FIFO queue for values of type `T`.
53#[derive(Debug, Allocative)]
54#[allocative(bound = "C, T: Allocative")]
55pub struct QueueView<C, T> {
56    /// The view context.
57    #[allocative(skip)]
58    context: C,
59    /// The range of indices for entries persisted in storage.
60    #[allocative(visit = visit_allocative_simple)]
61    stored_indices: Range<usize>,
62    /// The number of entries to delete from the front.
63    front_delete_count: usize,
64    /// Whether to clear storage before applying updates.
65    delete_storage_first: bool,
66    /// New values added to the back, not yet persisted to storage.
67    new_back_values: VecDeque<T>,
68}
69
70impl<C, T> View for QueueView<C, T>
71where
72    C: Context,
73    T: Serialize + Send + Sync,
74{
75    const NUM_INIT_KEYS: usize = 1;
76
77    type Context = C;
78
79    fn context(&self) -> C {
80        self.context.clone()
81    }
82
83    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
84        Ok(vec![context.base_key().base_tag(KeyTag::Store as u8)])
85    }
86
87    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
88        let stored_indices =
89            from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
90        Ok(Self {
91            context,
92            stored_indices,
93            front_delete_count: 0,
94            delete_storage_first: false,
95            new_back_values: VecDeque::new(),
96        })
97    }
98
99    fn rollback(&mut self) {
100        self.delete_storage_first = false;
101        self.front_delete_count = 0;
102        self.new_back_values.clear();
103    }
104
105    async fn has_pending_changes(&self) -> bool {
106        if self.delete_storage_first {
107            return true;
108        }
109        if self.front_delete_count > 0 {
110            return true;
111        }
112        !self.new_back_values.is_empty()
113    }
114
115    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
116        let mut delete_view = false;
117        if self.delete_storage_first {
118            batch.delete_key_prefix(self.context.base_key().bytes.clone());
119            delete_view = true;
120        }
121        let mut new_stored_indices = self.stored_indices.clone();
122        if self.stored_count() == 0 {
123            let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
124            batch.delete_key_prefix(key_prefix);
125            new_stored_indices = Range::default();
126        } else if self.front_delete_count > 0 {
127            let deletion_range = self.stored_indices.clone().take(self.front_delete_count);
128            new_stored_indices.start += self.front_delete_count;
129            for index in deletion_range {
130                let key = self
131                    .context
132                    .base_key()
133                    .derive_tag_key(KeyTag::Index as u8, &index)?;
134                batch.delete_key(key);
135            }
136        }
137        if !self.new_back_values.is_empty() {
138            delete_view = false;
139            for value in &self.new_back_values {
140                let key = self
141                    .context
142                    .base_key()
143                    .derive_tag_key(KeyTag::Index as u8, &new_stored_indices.end)?;
144                batch.put_key_value(key, value)?;
145                new_stored_indices.end += 1;
146            }
147        }
148        if !self.delete_storage_first || !new_stored_indices.is_empty() {
149            let key = self.context.base_key().base_tag(KeyTag::Store as u8);
150            batch.put_key_value(key, &new_stored_indices)?;
151        }
152        Ok(delete_view)
153    }
154
155    fn post_save(&mut self) {
156        if self.stored_count() == 0 {
157            self.stored_indices = Range::default();
158        } else if self.front_delete_count > 0 {
159            self.stored_indices.start += self.front_delete_count;
160        }
161        if !self.new_back_values.is_empty() {
162            self.stored_indices.end += self.new_back_values.len();
163            self.new_back_values.clear();
164        }
165        self.front_delete_count = 0;
166        self.delete_storage_first = false;
167    }
168
169    fn clear(&mut self) {
170        self.delete_storage_first = true;
171        self.new_back_values.clear();
172    }
173}
174
175impl<C, T> ClonableView for QueueView<C, T>
176where
177    C: Context,
178    T: Clone + Send + Sync + Serialize,
179{
180    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
181        Ok(QueueView {
182            context: self.context.clone(),
183            stored_indices: self.stored_indices.clone(),
184            front_delete_count: self.front_delete_count,
185            delete_storage_first: self.delete_storage_first,
186            new_back_values: self.new_back_values.clone(),
187        })
188    }
189}
190
191impl<C, T> QueueView<C, T> {
192    fn stored_count(&self) -> usize {
193        if self.delete_storage_first {
194            0
195        } else {
196            self.stored_indices.len() - self.front_delete_count
197        }
198    }
199}
200
201impl<'a, C, T> QueueView<C, T>
202where
203    C: Context,
204    T: Send + Sync + Clone + Serialize + DeserializeOwned,
205{
206    async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
207        let key = self
208            .context
209            .base_key()
210            .derive_tag_key(KeyTag::Index as u8, &index)?;
211        Ok(self.context.store().read_value(&key).await?)
212    }
213
214    /// Reads the front value, if any.
215    /// ```rust
216    /// # tokio_test::block_on(async {
217    /// # use linera_views::context::MemoryContext;
218    /// # use linera_views::queue_view::QueueView;
219    /// # use linera_views::views::View;
220    /// # let context = MemoryContext::new_for_testing(());
221    /// let mut queue = QueueView::load(context).await.unwrap();
222    /// queue.push_back(34);
223    /// queue.push_back(42);
224    /// assert_eq!(queue.front().await.unwrap(), Some(34));
225    /// # })
226    /// ```
227    pub async fn front(&self) -> Result<Option<T>, ViewError> {
228        let stored_remainder = self.stored_count();
229        let value = if stored_remainder > 0 {
230            self.get(self.stored_indices.end - stored_remainder).await?
231        } else {
232            self.new_back_values.front().cloned()
233        };
234        Ok(value)
235    }
236
237    /// Reads the back value, if any.
238    /// ```rust
239    /// # tokio_test::block_on(async {
240    /// # use linera_views::context::MemoryContext;
241    /// # use linera_views::queue_view::QueueView;
242    /// # use linera_views::views::View;
243    /// # let context = MemoryContext::new_for_testing(());
244    /// let mut queue = QueueView::load(context).await.unwrap();
245    /// queue.push_back(34);
246    /// queue.push_back(42);
247    /// assert_eq!(queue.back().await.unwrap(), Some(42));
248    /// # })
249    /// ```
250    pub async fn back(&self) -> Result<Option<T>, ViewError> {
251        Ok(match self.new_back_values.back() {
252            Some(value) => Some(value.clone()),
253            None if self.stored_count() > 0 => self.get(self.stored_indices.end - 1).await?,
254            _ => None,
255        })
256    }
257
258    /// Deletes the front value, if any.
259    /// ```rust
260    /// # tokio_test::block_on(async {
261    /// # use linera_views::context::MemoryContext;
262    /// # use linera_views::queue_view::QueueView;
263    /// # use linera_views::views::View;
264    /// # let context = MemoryContext::new_for_testing(());
265    /// let mut queue = QueueView::load(context).await.unwrap();
266    /// queue.push_back(34 as u128);
267    /// queue.delete_front();
268    /// assert_eq!(queue.elements().await.unwrap(), Vec::<u128>::new());
269    /// # })
270    /// ```
271    pub fn delete_front(&mut self) {
272        if self.stored_count() > 0 {
273            self.front_delete_count += 1;
274        } else {
275            self.new_back_values.pop_front();
276        }
277    }
278
279    /// Pushes a value to the end of the queue.
280    /// ```rust
281    /// # tokio_test::block_on(async {
282    /// # use linera_views::context::MemoryContext;
283    /// # use linera_views::queue_view::QueueView;
284    /// # use linera_views::views::View;
285    /// # let context = MemoryContext::new_for_testing(());
286    /// let mut queue = QueueView::load(context).await.unwrap();
287    /// queue.push_back(34);
288    /// queue.push_back(37);
289    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
290    /// # })
291    /// ```
292    pub fn push_back(&mut self, value: T) {
293        self.new_back_values.push_back(value);
294    }
295
296    /// Reads the size of the queue.
297    /// ```rust
298    /// # tokio_test::block_on(async {
299    /// # use linera_views::context::MemoryContext;
300    /// # use linera_views::queue_view::QueueView;
301    /// # use linera_views::views::View;
302    /// # let context = MemoryContext::new_for_testing(());
303    /// let mut queue = QueueView::load(context).await.unwrap();
304    /// queue.push_back(34);
305    /// assert_eq!(queue.count(), 1);
306    /// # })
307    /// ```
308    pub fn count(&self) -> usize {
309        self.stored_count() + self.new_back_values.len()
310    }
311
312    /// Obtains the extra data.
313    pub fn extra(&self) -> &C::Extra {
314        self.context.extra()
315    }
316
317    async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
318        let count = range.len();
319        let mut keys = Vec::with_capacity(count);
320        for index in range {
321            let key = self
322                .context
323                .base_key()
324                .derive_tag_key(KeyTag::Index as u8, &index)?;
325            keys.push(key)
326        }
327        let mut values = Vec::with_capacity(count);
328        for entry in self.context.store().read_multi_values(&keys).await? {
329            match entry {
330                None => {
331                    return Err(ViewError::MissingEntries("QueueView".into()));
332                }
333                Some(value) => values.push(value),
334            }
335        }
336        Ok(values)
337    }
338
339    /// Reads the `count` next values in the queue (including staged ones).
340    /// ```rust
341    /// # tokio_test::block_on(async {
342    /// # use linera_views::context::MemoryContext;
343    /// # use linera_views::queue_view::QueueView;
344    /// # use linera_views::views::View;
345    /// # let context = MemoryContext::new_for_testing(());
346    /// let mut queue = QueueView::load(context).await.unwrap();
347    /// queue.push_back(34);
348    /// queue.push_back(42);
349    /// assert_eq!(queue.read_front(1).await.unwrap(), vec![34]);
350    /// # })
351    /// ```
352    pub async fn read_front(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
353        if count > self.count() {
354            count = self.count();
355        }
356        if count == 0 {
357            return Ok(Vec::new());
358        }
359        let mut values = Vec::with_capacity(count);
360        if !self.delete_storage_first {
361            let stored_remainder = self.stored_count();
362            let start = self.stored_indices.end - stored_remainder;
363            if count <= stored_remainder {
364                values.extend(self.read_context(start..(start + count)).await?);
365            } else {
366                values.extend(self.read_context(start..self.stored_indices.end).await?);
367                values.extend(
368                    self.new_back_values
369                        .range(0..(count - stored_remainder))
370                        .cloned(),
371                );
372            }
373        } else {
374            values.extend(self.new_back_values.range(0..count).cloned());
375        }
376        Ok(values)
377    }
378
379    /// Reads the `count` last values in the queue (including staged ones).
380    /// ```rust
381    /// # tokio_test::block_on(async {
382    /// # use linera_views::context::MemoryContext;
383    /// # use linera_views::queue_view::QueueView;
384    /// # use linera_views::views::View;
385    /// # let context = MemoryContext::new_for_testing(());
386    /// let mut queue = QueueView::load(context).await.unwrap();
387    /// queue.push_back(34);
388    /// queue.push_back(42);
389    /// assert_eq!(queue.read_back(1).await.unwrap(), vec![42]);
390    /// # })
391    /// ```
392    pub async fn read_back(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
393        if count > self.count() {
394            count = self.count();
395        }
396        if count == 0 {
397            return Ok(Vec::new());
398        }
399        let mut values = Vec::with_capacity(count);
400        let new_back_len = self.new_back_values.len();
401        if count <= new_back_len || self.delete_storage_first {
402            values.extend(
403                self.new_back_values
404                    .range((new_back_len - count)..new_back_len)
405                    .cloned(),
406            );
407        } else {
408            let start = self.stored_indices.end + new_back_len - count;
409            values.extend(self.read_context(start..self.stored_indices.end).await?);
410            values.extend(self.new_back_values.iter().cloned());
411        }
412        Ok(values)
413    }
414
415    /// Reads all the elements
416    /// ```rust
417    /// # tokio_test::block_on(async {
418    /// # use linera_views::context::MemoryContext;
419    /// # use linera_views::queue_view::QueueView;
420    /// # use linera_views::views::View;
421    /// # let context = MemoryContext::new_for_testing(());
422    /// let mut queue = QueueView::load(context).await.unwrap();
423    /// queue.push_back(34);
424    /// queue.push_back(37);
425    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
426    /// # })
427    /// ```
428    pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
429        let count = self.count();
430        self.read_front(count).await
431    }
432
433    async fn load_all(&mut self) -> Result<(), ViewError> {
434        if !self.delete_storage_first {
435            let stored_remainder = self.stored_count();
436            let start = self.stored_indices.end - stored_remainder;
437            let elements = self.read_context(start..self.stored_indices.end).await?;
438            let shift = self.stored_indices.end - start;
439            for elt in elements {
440                self.new_back_values.push_back(elt);
441            }
442            self.new_back_values.rotate_right(shift);
443            // All indices are being deleted at the next flush. This is because they are deleted either:
444            // * Because a self.front_delete_count forces them to be removed
445            // * Or because loading them means that their value can be changed which invalidates
446            //   the entries on storage
447            self.delete_storage_first = true;
448        }
449        Ok(())
450    }
451
452    /// Gets a mutable iterator on the entries of the queue
453    /// ```rust
454    /// # tokio_test::block_on(async {
455    /// # use linera_views::context::MemoryContext;
456    /// # use linera_views::queue_view::QueueView;
457    /// # use linera_views::views::View;
458    /// # let context = MemoryContext::new_for_testing(());
459    /// let mut queue = QueueView::load(context).await.unwrap();
460    /// queue.push_back(34);
461    /// let mut iter = queue.iter_mut().await.unwrap();
462    /// let value = iter.next().unwrap();
463    /// *value = 42;
464    /// assert_eq!(queue.elements().await.unwrap(), vec![42]);
465    /// # })
466    /// ```
467    pub async fn iter_mut(&'a mut self) -> Result<IterMut<'a, T>, ViewError> {
468        self.load_all().await?;
469        Ok(self.new_back_values.iter_mut())
470    }
471}
472
473impl<C, T> HashableView for QueueView<C, T>
474where
475    C: Context,
476    T: Send + Sync + Clone + Serialize + DeserializeOwned,
477{
478    type Hasher = sha3::Sha3_256;
479
480    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
481        self.hash().await
482    }
483
484    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
485        #[cfg(with_metrics)]
486        let _hash_latency = metrics::QUEUE_VIEW_HASH_RUNTIME.measure_latency();
487        let elements = self.elements().await?;
488        let mut hasher = sha3::Sha3_256::default();
489        hasher.update_with_bcs_bytes(&elements)?;
490        Ok(hasher.finalize())
491    }
492}
493
494/// Type wrapping `QueueView` while memoizing the hash.
495pub type HashedQueueView<C, T> = WrappedHashableContainerView<C, QueueView<C, T>, HasherOutput>;
496
497/// Wrapper around `QueueView` to compute hashes based on the history of changes.
498pub type HistoricallyHashedQueueView<C, T> = HistoricallyHashableView<C, QueueView<C, T>>;
499
500#[cfg(with_graphql)]
501mod graphql {
502    use std::borrow::Cow;
503
504    use super::QueueView;
505    use crate::{
506        context::Context,
507        graphql::{hash_name, mangle},
508    };
509
510    impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for QueueView<C, T> {
511        fn type_name() -> Cow<'static, str> {
512            format!(
513                "QueueView_{}_{:08x}",
514                mangle(T::type_name()),
515                hash_name::<T>()
516            )
517            .into()
518        }
519    }
520
521    #[async_graphql::Object(cache_control(no_cache), name_type)]
522    impl<C: Context, T: async_graphql::OutputType> QueueView<C, T>
523    where
524        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
525    {
526        #[graphql(derived(name = "count"))]
527        async fn count_(&self) -> Result<u32, async_graphql::Error> {
528            Ok(self.count() as u32)
529        }
530
531        async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
532            Ok(self
533                .read_front(count.unwrap_or_else(|| self.count()))
534                .await?)
535        }
536    }
537}