linera_views/views/
queue_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{vec_deque::IterMut, VecDeque},
6    ops::Range,
7};
8
9#[cfg(with_metrics)]
10use linera_base::prometheus_util::MeasureLatency as _;
11use serde::{de::DeserializeOwned, Serialize};
12
13use crate::{
14    batch::Batch,
15    common::{from_bytes_option_or_default, HasherOutput},
16    context::Context,
17    hashable_wrapper::WrappedHashableContainerView,
18    store::ReadableKeyValueStore as _,
19    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
20};
21
22#[cfg(with_metrics)]
23mod metrics {
24    use std::sync::LazyLock;
25
26    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
27    use prometheus::HistogramVec;
28
29    /// The runtime of hash computation
30    pub static QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
31        register_histogram_vec(
32            "queue_view_hash_runtime",
33            "QueueView hash runtime",
34            &[],
35            exponential_bucket_latencies(5.0),
36        )
37    });
38}
39
40/// Key tags to create the sub-keys of a `QueueView` on top of the base key.
41#[repr(u8)]
42enum KeyTag {
43    /// Prefix for the storing of the variable `stored_indices`.
44    Store = MIN_VIEW_TAG,
45    /// Prefix for the indices of the log.
46    Index,
47}
48
49/// A view that supports a FIFO queue for values of type `T`.
50#[derive(Debug)]
51pub struct QueueView<C, T> {
52    context: C,
53    stored_indices: Range<usize>,
54    front_delete_count: usize,
55    delete_storage_first: bool,
56    new_back_values: VecDeque<T>,
57}
58
59impl<C, T> View for QueueView<C, T>
60where
61    C: Context,
62    T: Serialize + Send + Sync,
63{
64    const NUM_INIT_KEYS: usize = 1;
65
66    type Context = C;
67
68    fn context(&self) -> &C {
69        &self.context
70    }
71
72    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
73        Ok(vec![context.base_key().base_tag(KeyTag::Store as u8)])
74    }
75
76    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
77        let stored_indices =
78            from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
79        Ok(Self {
80            context,
81            stored_indices,
82            front_delete_count: 0,
83            delete_storage_first: false,
84            new_back_values: VecDeque::new(),
85        })
86    }
87
88    async fn load(context: C) -> Result<Self, ViewError> {
89        let keys = Self::pre_load(&context)?;
90        let values = context.store().read_multi_values_bytes(keys).await?;
91        Self::post_load(context, &values)
92    }
93
94    fn rollback(&mut self) {
95        self.delete_storage_first = false;
96        self.front_delete_count = 0;
97        self.new_back_values.clear();
98    }
99
100    async fn has_pending_changes(&self) -> bool {
101        if self.delete_storage_first {
102            return true;
103        }
104        if self.front_delete_count > 0 {
105            return true;
106        }
107        !self.new_back_values.is_empty()
108    }
109
110    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
111        let mut delete_view = false;
112        if self.delete_storage_first {
113            batch.delete_key_prefix(self.context.base_key().bytes.clone());
114            delete_view = true;
115        }
116        if self.stored_count() == 0 {
117            let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
118            batch.delete_key_prefix(key_prefix);
119            self.stored_indices = Range::default();
120        } else if self.front_delete_count > 0 {
121            let deletion_range = self.stored_indices.clone().take(self.front_delete_count);
122            self.stored_indices.start += self.front_delete_count;
123            for index in deletion_range {
124                let key = self
125                    .context
126                    .base_key()
127                    .derive_tag_key(KeyTag::Index as u8, &index)?;
128                batch.delete_key(key);
129            }
130        }
131        if !self.new_back_values.is_empty() {
132            delete_view = false;
133            for value in &self.new_back_values {
134                let key = self
135                    .context
136                    .base_key()
137                    .derive_tag_key(KeyTag::Index as u8, &self.stored_indices.end)?;
138                batch.put_key_value(key, value)?;
139                self.stored_indices.end += 1;
140            }
141            self.new_back_values.clear();
142        }
143        if !self.delete_storage_first || !self.stored_indices.is_empty() {
144            let key = self.context.base_key().base_tag(KeyTag::Store as u8);
145            batch.put_key_value(key, &self.stored_indices)?;
146        }
147        self.front_delete_count = 0;
148        self.delete_storage_first = false;
149        Ok(delete_view)
150    }
151
152    fn clear(&mut self) {
153        self.delete_storage_first = true;
154        self.new_back_values.clear();
155    }
156}
157
158impl<C, T> ClonableView for QueueView<C, T>
159where
160    C: Context,
161    T: Clone + Send + Sync + Serialize,
162{
163    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
164        Ok(QueueView {
165            context: self.context.clone(),
166            stored_indices: self.stored_indices.clone(),
167            front_delete_count: self.front_delete_count,
168            delete_storage_first: self.delete_storage_first,
169            new_back_values: self.new_back_values.clone(),
170        })
171    }
172}
173
174impl<C, T> QueueView<C, T> {
175    fn stored_count(&self) -> usize {
176        if self.delete_storage_first {
177            0
178        } else {
179            self.stored_indices.len() - self.front_delete_count
180        }
181    }
182}
183
184impl<'a, C, T> QueueView<C, T>
185where
186    C: Context,
187    T: Send + Sync + Clone + Serialize + DeserializeOwned,
188{
189    async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
190        let key = self
191            .context
192            .base_key()
193            .derive_tag_key(KeyTag::Index as u8, &index)?;
194        Ok(self.context.store().read_value(&key).await?)
195    }
196
197    /// Reads the front value, if any.
198    /// ```rust
199    /// # tokio_test::block_on(async {
200    /// # use linera_views::context::MemoryContext;
201    /// # use linera_views::queue_view::QueueView;
202    /// # use linera_views::views::View;
203    /// # let context = MemoryContext::new_for_testing(());
204    /// let mut queue = QueueView::load(context).await.unwrap();
205    /// queue.push_back(34);
206    /// queue.push_back(42);
207    /// assert_eq!(queue.front().await.unwrap(), Some(34));
208    /// # })
209    /// ```
210    pub async fn front(&self) -> Result<Option<T>, ViewError> {
211        let stored_remainder = self.stored_count();
212        let value = if stored_remainder > 0 {
213            self.get(self.stored_indices.end - stored_remainder).await?
214        } else {
215            self.new_back_values.front().cloned()
216        };
217        Ok(value)
218    }
219
220    /// Reads the back value, if any.
221    /// ```rust
222    /// # tokio_test::block_on(async {
223    /// # use linera_views::context::MemoryContext;
224    /// # use linera_views::queue_view::QueueView;
225    /// # use linera_views::views::View;
226    /// # let context = MemoryContext::new_for_testing(());
227    /// let mut queue = QueueView::load(context).await.unwrap();
228    /// queue.push_back(34);
229    /// queue.push_back(42);
230    /// assert_eq!(queue.back().await.unwrap(), Some(42));
231    /// # })
232    /// ```
233    pub async fn back(&self) -> Result<Option<T>, ViewError> {
234        Ok(match self.new_back_values.back() {
235            Some(value) => Some(value.clone()),
236            None if self.stored_count() > 0 => self.get(self.stored_indices.end - 1).await?,
237            _ => None,
238        })
239    }
240
241    /// Deletes the front value, if any.
242    /// ```rust
243    /// # tokio_test::block_on(async {
244    /// # use linera_views::context::MemoryContext;
245    /// # use linera_views::queue_view::QueueView;
246    /// # use linera_views::views::View;
247    /// # let context = MemoryContext::new_for_testing(());
248    /// let mut queue = QueueView::load(context).await.unwrap();
249    /// queue.push_back(34 as u128);
250    /// queue.delete_front();
251    /// assert_eq!(queue.elements().await.unwrap(), Vec::<u128>::new());
252    /// # })
253    /// ```
254    pub fn delete_front(&mut self) {
255        if self.stored_count() > 0 {
256            self.front_delete_count += 1;
257        } else {
258            self.new_back_values.pop_front();
259        }
260    }
261
262    /// Pushes a value to the end of the queue.
263    /// ```rust
264    /// # tokio_test::block_on(async {
265    /// # use linera_views::context::MemoryContext;
266    /// # use linera_views::queue_view::QueueView;
267    /// # use linera_views::views::View;
268    /// # let context = MemoryContext::new_for_testing(());
269    /// let mut queue = QueueView::load(context).await.unwrap();
270    /// queue.push_back(34);
271    /// queue.push_back(37);
272    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
273    /// # })
274    /// ```
275    pub fn push_back(&mut self, value: T) {
276        self.new_back_values.push_back(value);
277    }
278
279    /// Reads the size of the queue.
280    /// ```rust
281    /// # tokio_test::block_on(async {
282    /// # use linera_views::context::MemoryContext;
283    /// # use linera_views::queue_view::QueueView;
284    /// # use linera_views::views::View;
285    /// # let context = MemoryContext::new_for_testing(());
286    /// let mut queue = QueueView::load(context).await.unwrap();
287    /// queue.push_back(34);
288    /// assert_eq!(queue.count(), 1);
289    /// # })
290    /// ```
291    pub fn count(&self) -> usize {
292        self.stored_count() + self.new_back_values.len()
293    }
294
295    /// Obtains the extra data.
296    pub fn extra(&self) -> &C::Extra {
297        self.context.extra()
298    }
299
300    async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
301        let count = range.len();
302        let mut keys = Vec::with_capacity(count);
303        for index in range {
304            let key = self
305                .context
306                .base_key()
307                .derive_tag_key(KeyTag::Index as u8, &index)?;
308            keys.push(key)
309        }
310        let mut values = Vec::with_capacity(count);
311        for entry in self.context.store().read_multi_values(keys).await? {
312            match entry {
313                None => {
314                    return Err(ViewError::MissingEntries);
315                }
316                Some(value) => values.push(value),
317            }
318        }
319        Ok(values)
320    }
321
322    /// Reads the `count` next values in the queue (including staged ones).
323    /// ```rust
324    /// # tokio_test::block_on(async {
325    /// # use linera_views::context::MemoryContext;
326    /// # use linera_views::queue_view::QueueView;
327    /// # use linera_views::views::View;
328    /// # let context = MemoryContext::new_for_testing(());
329    /// let mut queue = QueueView::load(context).await.unwrap();
330    /// queue.push_back(34);
331    /// queue.push_back(42);
332    /// assert_eq!(queue.read_front(1).await.unwrap(), vec![34]);
333    /// # })
334    /// ```
335    pub async fn read_front(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
336        if count > self.count() {
337            count = self.count();
338        }
339        if count == 0 {
340            return Ok(Vec::new());
341        }
342        let mut values = Vec::with_capacity(count);
343        if !self.delete_storage_first {
344            let stored_remainder = self.stored_count();
345            let start = self.stored_indices.end - stored_remainder;
346            if count <= stored_remainder {
347                values.extend(self.read_context(start..(start + count)).await?);
348            } else {
349                values.extend(self.read_context(start..self.stored_indices.end).await?);
350                values.extend(
351                    self.new_back_values
352                        .range(0..(count - stored_remainder))
353                        .cloned(),
354                );
355            }
356        } else {
357            values.extend(self.new_back_values.range(0..count).cloned());
358        }
359        Ok(values)
360    }
361
362    /// Reads the `count` last values in the queue (including staged ones).
363    /// ```rust
364    /// # tokio_test::block_on(async {
365    /// # use linera_views::context::MemoryContext;
366    /// # use linera_views::queue_view::QueueView;
367    /// # use linera_views::views::View;
368    /// # let context = MemoryContext::new_for_testing(());
369    /// let mut queue = QueueView::load(context).await.unwrap();
370    /// queue.push_back(34);
371    /// queue.push_back(42);
372    /// assert_eq!(queue.read_back(1).await.unwrap(), vec![42]);
373    /// # })
374    /// ```
375    pub async fn read_back(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
376        if count > self.count() {
377            count = self.count();
378        }
379        if count == 0 {
380            return Ok(Vec::new());
381        }
382        let mut values = Vec::with_capacity(count);
383        let new_back_len = self.new_back_values.len();
384        if count <= new_back_len || self.delete_storage_first {
385            values.extend(
386                self.new_back_values
387                    .range((new_back_len - count)..new_back_len)
388                    .cloned(),
389            );
390        } else {
391            let start = self.stored_indices.end + new_back_len - count;
392            values.extend(self.read_context(start..self.stored_indices.end).await?);
393            values.extend(self.new_back_values.iter().cloned());
394        }
395        Ok(values)
396    }
397
398    /// Reads all the elements
399    /// ```rust
400    /// # tokio_test::block_on(async {
401    /// # use linera_views::context::MemoryContext;
402    /// # use linera_views::queue_view::QueueView;
403    /// # use linera_views::views::View;
404    /// # let context = MemoryContext::new_for_testing(());
405    /// let mut queue = QueueView::load(context).await.unwrap();
406    /// queue.push_back(34);
407    /// queue.push_back(37);
408    /// assert_eq!(queue.elements().await.unwrap(), vec![34, 37]);
409    /// # })
410    /// ```
411    pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
412        let count = self.count();
413        self.read_front(count).await
414    }
415
416    async fn load_all(&mut self) -> Result<(), ViewError> {
417        if !self.delete_storage_first {
418            let stored_remainder = self.stored_count();
419            let start = self.stored_indices.end - stored_remainder;
420            let elements = self.read_context(start..self.stored_indices.end).await?;
421            let shift = self.stored_indices.end - start;
422            for elt in elements {
423                self.new_back_values.push_back(elt);
424            }
425            self.new_back_values.rotate_right(shift);
426            // All indices are being deleted at the next flush. This is because they are deleted either:
427            // * Because a self.front_delete_count forces them to be removed
428            // * Or because loading them means that their value can be changed which invalidates
429            //   the entries on storage
430            self.delete_storage_first = true;
431        }
432        Ok(())
433    }
434
435    /// Gets a mutable iterator on the entries of the queue
436    /// ```rust
437    /// # tokio_test::block_on(async {
438    /// # use linera_views::context::MemoryContext;
439    /// # use linera_views::queue_view::QueueView;
440    /// # use linera_views::views::View;
441    /// # let context = MemoryContext::new_for_testing(());
442    /// let mut queue = QueueView::load(context).await.unwrap();
443    /// queue.push_back(34);
444    /// let mut iter = queue.iter_mut().await.unwrap();
445    /// let value = iter.next().unwrap();
446    /// *value = 42;
447    /// assert_eq!(queue.elements().await.unwrap(), vec![42]);
448    /// # })
449    /// ```
450    pub async fn iter_mut(&'a mut self) -> Result<IterMut<'a, T>, ViewError> {
451        self.load_all().await?;
452        Ok(self.new_back_values.iter_mut())
453    }
454}
455
456impl<C, T> HashableView for QueueView<C, T>
457where
458    C: Context,
459    T: Send + Sync + Clone + Serialize + DeserializeOwned,
460{
461    type Hasher = sha3::Sha3_256;
462
463    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
464        self.hash().await
465    }
466
467    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
468        #[cfg(with_metrics)]
469        let _hash_latency = metrics::QUEUE_VIEW_HASH_RUNTIME.measure_latency();
470        let elements = self.elements().await?;
471        let mut hasher = sha3::Sha3_256::default();
472        hasher.update_with_bcs_bytes(&elements)?;
473        Ok(hasher.finalize())
474    }
475}
476
477/// Type wrapping `QueueView` while memoizing the hash.
478pub type HashedQueueView<C, T> = WrappedHashableContainerView<C, QueueView<C, T>, HasherOutput>;
479
480#[cfg(with_graphql)]
481mod graphql {
482    use std::borrow::Cow;
483
484    use super::QueueView;
485    use crate::{
486        context::Context,
487        graphql::{hash_name, mangle},
488    };
489
490    impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for QueueView<C, T> {
491        fn type_name() -> Cow<'static, str> {
492            format!(
493                "QueueView_{}_{:08x}",
494                mangle(T::type_name()),
495                hash_name::<T>()
496            )
497            .into()
498        }
499    }
500
501    #[async_graphql::Object(cache_control(no_cache), name_type)]
502    impl<C: Context, T: async_graphql::OutputType> QueueView<C, T>
503    where
504        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
505    {
506        async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
507            Ok(self
508                .read_front(count.unwrap_or_else(|| self.count()))
509                .await?)
510        }
511    }
512}