linera_views/views/
queue_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{vec_deque::IterMut, VecDeque},
6    ops::Range,
7};
8
9#[cfg(with_metrics)]
10use linera_base::prometheus_util::MeasureLatency as _;
11use serde::{de::DeserializeOwned, Serialize};
12
13use crate::{
14    batch::Batch,
15    common::{from_bytes_option_or_default, HasherOutput},
16    context::Context,
17    hashable_wrapper::WrappedHashableContainerView,
18    historical_hash_wrapper::HistoricallyHashableView,
19    store::ReadableKeyValueStore as _,
20    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
21};
22
23#[cfg(with_metrics)]
24mod metrics {
25    use std::sync::LazyLock;
26
27    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
28    use prometheus::HistogramVec;
29
30    /// The runtime of hash computation
31    pub static QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
32        register_histogram_vec(
33            "queue_view_hash_runtime",
34            "QueueView hash runtime",
35            &[],
36            exponential_bucket_latencies(5.0),
37        )
38    });
39}
40
41/// Key tags to create the sub-keys of a `QueueView` on top of the base key.
42#[repr(u8)]
43enum KeyTag {
44    /// Prefix for the storing of the variable `stored_indices`.
45    Store = MIN_VIEW_TAG,
46    /// Prefix for the indices of the log.
47    Index,
48}
49
50/// A view that supports a FIFO queue for values of type `T`.
51#[derive(Debug)]
52pub struct QueueView<C, T> {
53    context: C,
54    stored_indices: Range<usize>,
55    front_delete_count: usize,
56    delete_storage_first: bool,
57    new_back_values: VecDeque<T>,
58}
59
60impl<C, T> View for QueueView<C, T>
61where
62    C: Context,
63    T: Serialize + Send + Sync,
64{
65    const NUM_INIT_KEYS: usize = 1;
66
67    type Context = C;
68
69    fn context(&self) -> &C {
70        &self.context
71    }
72
73    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
74        Ok(vec![context.base_key().base_tag(KeyTag::Store as u8)])
75    }
76
77    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
78        let stored_indices =
79            from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
80        Ok(Self {
81            context,
82            stored_indices,
83            front_delete_count: 0,
84            delete_storage_first: false,
85            new_back_values: VecDeque::new(),
86        })
87    }
88
89    fn rollback(&mut self) {
90        self.delete_storage_first = false;
91        self.front_delete_count = 0;
92        self.new_back_values.clear();
93    }
94
95    async fn has_pending_changes(&self) -> bool {
96        if self.delete_storage_first {
97            return true;
98        }
99        if self.front_delete_count > 0 {
100            return true;
101        }
102        !self.new_back_values.is_empty()
103    }
104
105    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
106        let mut delete_view = false;
107        if self.delete_storage_first {
108            batch.delete_key_prefix(self.context.base_key().bytes.clone());
109            delete_view = true;
110        }
111        if self.stored_count() == 0 {
112            let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
113            batch.delete_key_prefix(key_prefix);
114            self.stored_indices = Range::default();
115        } else if self.front_delete_count > 0 {
116            let deletion_range = self.stored_indices.clone().take(self.front_delete_count);
117            self.stored_indices.start += self.front_delete_count;
118            for index in deletion_range {
119                let key = self
120                    .context
121                    .base_key()
122                    .derive_tag_key(KeyTag::Index as u8, &index)?;
123                batch.delete_key(key);
124            }
125        }
126        if !self.new_back_values.is_empty() {
127            delete_view = false;
128            for value in &self.new_back_values {
129                let key = self
130                    .context
131                    .base_key()
132                    .derive_tag_key(KeyTag::Index as u8, &self.stored_indices.end)?;
133                batch.put_key_value(key, value)?;
134                self.stored_indices.end += 1;
135            }
136            self.new_back_values.clear();
137        }
138        if !self.delete_storage_first || !self.stored_indices.is_empty() {
139            let key = self.context.base_key().base_tag(KeyTag::Store as u8);
140            batch.put_key_value(key, &self.stored_indices)?;
141        }
142        self.front_delete_count = 0;
143        self.delete_storage_first = false;
144        Ok(delete_view)
145    }
146
147    fn clear(&mut self) {
148        self.delete_storage_first = true;
149        self.new_back_values.clear();
150    }
151}
152
153impl<C, T> ClonableView for QueueView<C, T>
154where
155    C: Context,
156    T: Clone + Send + Sync + Serialize,
157{
158    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
159        Ok(QueueView {
160            context: self.context.clone(),
161            stored_indices: self.stored_indices.clone(),
162            front_delete_count: self.front_delete_count,
163            delete_storage_first: self.delete_storage_first,
164            new_back_values: self.new_back_values.clone(),
165        })
166    }
167}
168
169impl<C, T> QueueView<C, T> {
170    fn stored_count(&self) -> usize {
171        if self.delete_storage_first {
172            0
173        } else {
174            self.stored_indices.len() - self.front_delete_count
175        }
176    }
177}
178
179impl<'a, C, T> QueueView<C, T>
180where
181    C: Context,
182    T: Send + Sync + Clone + Serialize + DeserializeOwned,
183{
184    async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
185        let key = self
186            .context
187            .base_key()
188            .derive_tag_key(KeyTag::Index as u8, &index)?;
189        Ok(self.context.store().read_value(&key).await?)
190    }
191
192    /// Reads the front value, if any.
193    /// ```rust
194    /// # tokio_test::block_on(async {
195    /// # use linera_views::context::MemoryContext;
196    /// # use linera_views::queue_view::QueueView;
197    /// # use linera_views::views::View;
198    /// # let context = MemoryContext::new_for_testing(());
199    /// let mut queue = QueueView::load(context).await.unwrap();
200    /// queue.push_back(34);
201    /// queue.push_back(42);
202    /// assert_eq!(queue.front().await.unwrap(), Some(34));
203    /// # })
204    /// ```
205    pub async fn front(&self) -> Result<Option<T>, ViewError> {
206        let stored_remainder = self.stored_count();
207        let value = if stored_remainder > 0 {
208            self.get(self.stored_indices.end - stored_remainder).await?
209        } else {
210            self.new_back_values.front().cloned()
211        };
212        Ok(value)
213    }
214
215    /// Reads the back value, if any.
216    /// ```rust
217    /// # tokio_test::block_on(async {
218    /// # use linera_views::context::MemoryContext;
219    /// # use linera_views::queue_view::QueueView;
220    /// # use linera_views::views::View;
221    /// # let context = MemoryContext::new_for_testing(());
222    /// let mut queue = QueueView::load(context).await.unwrap();
223    /// queue.push_back(34);
224    /// queue.push_back(42);
225    /// assert_eq!(queue.back().await.unwrap(), Some(42));
226    /// # })
227    /// ```
228    pub async fn back(&self) -> Result<Option<T>, ViewError> {
229        Ok(match self.new_back_values.back() {
230            Some(value) => Some(value.clone()),
231            None if self.stored_count() > 0 => self.get(self.stored_indices.end - 1).await?,
232            _ => None,
233        })
234    }
235
236    /// Deletes the front value, if any.
237    /// ```rust
238    /// # tokio_test::block_on(async {
239    /// # use linera_views::context::MemoryContext;
240    /// # use linera_views::queue_view::QueueView;
241    /// # use linera_views::views::View;
242    /// # let context = MemoryContext::new_for_testing(());
243    /// let mut queue = QueueView::load(context).await.unwrap();
244    /// queue.push_back(34 as u128);
245    /// queue.delete_front();
246    /// assert_eq!(queue.elements().await.unwrap(), Vec::<u128>::new());
247    /// # })
248    /// ```
249    pub fn delete_front(&mut self) {
250        if self.stored_count() > 0 {
251            self.front_delete_count += 1;
252        } else {
253            self.new_back_values.pop_front();
254        }
255    }
256
257    /// Pushes a value to the end of the queue.
258    /// ```rust
259    /// # tokio_test::block_on(async {
260    /// # use linera_views::context::MemoryContext;
261    /// # use linera_views::queue_view::QueueView;
262    /// # use linera_views::views::View;
263    /// # let context = MemoryContext::new_for_testing(());
264    /// let mut queue = QueueView::load(context).await.unwrap();
265    /// queue.push_back(34);
266    /// queue.push_back(37);
267    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
268    /// # })
269    /// ```
270    pub fn push_back(&mut self, value: T) {
271        self.new_back_values.push_back(value);
272    }
273
274    /// Reads the size of the queue.
275    /// ```rust
276    /// # tokio_test::block_on(async {
277    /// # use linera_views::context::MemoryContext;
278    /// # use linera_views::queue_view::QueueView;
279    /// # use linera_views::views::View;
280    /// # let context = MemoryContext::new_for_testing(());
281    /// let mut queue = QueueView::load(context).await.unwrap();
282    /// queue.push_back(34);
283    /// assert_eq!(queue.count(), 1);
284    /// # })
285    /// ```
286    pub fn count(&self) -> usize {
287        self.stored_count() + self.new_back_values.len()
288    }
289
290    /// Obtains the extra data.
291    pub fn extra(&self) -> &C::Extra {
292        self.context.extra()
293    }
294
295    async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
296        let count = range.len();
297        let mut keys = Vec::with_capacity(count);
298        for index in range {
299            let key = self
300                .context
301                .base_key()
302                .derive_tag_key(KeyTag::Index as u8, &index)?;
303            keys.push(key)
304        }
305        let mut values = Vec::with_capacity(count);
306        for entry in self.context.store().read_multi_values(keys).await? {
307            match entry {
308                None => {
309                    return Err(ViewError::MissingEntries);
310                }
311                Some(value) => values.push(value),
312            }
313        }
314        Ok(values)
315    }
316
317    /// Reads the `count` next values in the queue (including staged ones).
318    /// ```rust
319    /// # tokio_test::block_on(async {
320    /// # use linera_views::context::MemoryContext;
321    /// # use linera_views::queue_view::QueueView;
322    /// # use linera_views::views::View;
323    /// # let context = MemoryContext::new_for_testing(());
324    /// let mut queue = QueueView::load(context).await.unwrap();
325    /// queue.push_back(34);
326    /// queue.push_back(42);
327    /// assert_eq!(queue.read_front(1).await.unwrap(), vec![34]);
328    /// # })
329    /// ```
330    pub async fn read_front(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
331        if count > self.count() {
332            count = self.count();
333        }
334        if count == 0 {
335            return Ok(Vec::new());
336        }
337        let mut values = Vec::with_capacity(count);
338        if !self.delete_storage_first {
339            let stored_remainder = self.stored_count();
340            let start = self.stored_indices.end - stored_remainder;
341            if count <= stored_remainder {
342                values.extend(self.read_context(start..(start + count)).await?);
343            } else {
344                values.extend(self.read_context(start..self.stored_indices.end).await?);
345                values.extend(
346                    self.new_back_values
347                        .range(0..(count - stored_remainder))
348                        .cloned(),
349                );
350            }
351        } else {
352            values.extend(self.new_back_values.range(0..count).cloned());
353        }
354        Ok(values)
355    }
356
357    /// Reads the `count` last values in the queue (including staged ones).
358    /// ```rust
359    /// # tokio_test::block_on(async {
360    /// # use linera_views::context::MemoryContext;
361    /// # use linera_views::queue_view::QueueView;
362    /// # use linera_views::views::View;
363    /// # let context = MemoryContext::new_for_testing(());
364    /// let mut queue = QueueView::load(context).await.unwrap();
365    /// queue.push_back(34);
366    /// queue.push_back(42);
367    /// assert_eq!(queue.read_back(1).await.unwrap(), vec![42]);
368    /// # })
369    /// ```
370    pub async fn read_back(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
371        if count > self.count() {
372            count = self.count();
373        }
374        if count == 0 {
375            return Ok(Vec::new());
376        }
377        let mut values = Vec::with_capacity(count);
378        let new_back_len = self.new_back_values.len();
379        if count <= new_back_len || self.delete_storage_first {
380            values.extend(
381                self.new_back_values
382                    .range((new_back_len - count)..new_back_len)
383                    .cloned(),
384            );
385        } else {
386            let start = self.stored_indices.end + new_back_len - count;
387            values.extend(self.read_context(start..self.stored_indices.end).await?);
388            values.extend(self.new_back_values.iter().cloned());
389        }
390        Ok(values)
391    }
392
393    /// Reads all the elements
394    /// ```rust
395    /// # tokio_test::block_on(async {
396    /// # use linera_views::context::MemoryContext;
397    /// # use linera_views::queue_view::QueueView;
398    /// # use linera_views::views::View;
399    /// # let context = MemoryContext::new_for_testing(());
400    /// let mut queue = QueueView::load(context).await.unwrap();
401    /// queue.push_back(34);
402    /// queue.push_back(37);
403    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
404    /// # })
405    /// ```
406    pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
407        let count = self.count();
408        self.read_front(count).await
409    }
410
411    async fn load_all(&mut self) -> Result<(), ViewError> {
412        if !self.delete_storage_first {
413            let stored_remainder = self.stored_count();
414            let start = self.stored_indices.end - stored_remainder;
415            let elements = self.read_context(start..self.stored_indices.end).await?;
416            let shift = self.stored_indices.end - start;
417            for elt in elements {
418                self.new_back_values.push_back(elt);
419            }
420            self.new_back_values.rotate_right(shift);
421            // All indices are being deleted at the next flush. This is because they are deleted either:
422            // * Because a self.front_delete_count forces them to be removed
423            // * Or because loading them means that their value can be changed which invalidates
424            //   the entries on storage
425            self.delete_storage_first = true;
426        }
427        Ok(())
428    }
429
430    /// Gets a mutable iterator on the entries of the queue
431    /// ```rust
432    /// # tokio_test::block_on(async {
433    /// # use linera_views::context::MemoryContext;
434    /// # use linera_views::queue_view::QueueView;
435    /// # use linera_views::views::View;
436    /// # let context = MemoryContext::new_for_testing(());
437    /// let mut queue = QueueView::load(context).await.unwrap();
438    /// queue.push_back(34);
439    /// let mut iter = queue.iter_mut().await.unwrap();
440    /// let value = iter.next().unwrap();
441    /// *value = 42;
442    /// assert_eq!(queue.elements().await.unwrap(), vec![42]);
443    /// # })
444    /// ```
445    pub async fn iter_mut(&'a mut self) -> Result<IterMut<'a, T>, ViewError> {
446        self.load_all().await?;
447        Ok(self.new_back_values.iter_mut())
448    }
449}
450
451impl<C, T> HashableView for QueueView<C, T>
452where
453    C: Context,
454    T: Send + Sync + Clone + Serialize + DeserializeOwned,
455{
456    type Hasher = sha3::Sha3_256;
457
458    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
459        self.hash().await
460    }
461
462    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
463        #[cfg(with_metrics)]
464        let _hash_latency = metrics::QUEUE_VIEW_HASH_RUNTIME.measure_latency();
465        let elements = self.elements().await?;
466        let mut hasher = sha3::Sha3_256::default();
467        hasher.update_with_bcs_bytes(&elements)?;
468        Ok(hasher.finalize())
469    }
470}
471
472/// Type wrapping `QueueView` while memoizing the hash.
473pub type HashedQueueView<C, T> = WrappedHashableContainerView<C, QueueView<C, T>, HasherOutput>;
474
475/// Wrapper around `QueueView` to compute hashes based on the history of changes.
476pub type HistoricallyHashedQueueView<C, T> = HistoricallyHashableView<C, QueueView<C, T>>;
477
478#[cfg(with_graphql)]
479mod graphql {
480    use std::borrow::Cow;
481
482    use super::QueueView;
483    use crate::{
484        context::Context,
485        graphql::{hash_name, mangle},
486    };
487
488    impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for QueueView<C, T> {
489        fn type_name() -> Cow<'static, str> {
490            format!(
491                "QueueView_{}_{:08x}",
492                mangle(T::type_name()),
493                hash_name::<T>()
494            )
495            .into()
496        }
497    }
498
499    #[async_graphql::Object(cache_control(no_cache), name_type)]
500    impl<C: Context, T: async_graphql::OutputType> QueueView<C, T>
501    where
502        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
503    {
504        #[graphql(derived(name = "count"))]
505        async fn count_(&self) -> Result<u32, async_graphql::Error> {
506            Ok(self.count() as u32)
507        }
508
509        async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
510            Ok(self
511                .read_front(count.unwrap_or_else(|| self.count()))
512                .await?)
513        }
514    }
515}