linera_views/views/
log_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    ops::{Bound, Range, RangeBounds},
7};
8
9use allocative::Allocative;
10use linera_base::data_types::ArithmeticError;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use serde::{de::DeserializeOwned, Serialize};
14
15use crate::{
16    batch::Batch,
17    common::{from_bytes_option_or_default, HasherOutput},
18    context::Context,
19    hashable_wrapper::WrappedHashableContainerView,
20    historical_hash_wrapper::HistoricallyHashableView,
21    store::ReadableKeyValueStore as _,
22    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
23};
24
25#[cfg(with_metrics)]
26mod metrics {
27    use std::sync::LazyLock;
28
29    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
30    use prometheus::HistogramVec;
31
32    /// The runtime of hash computation
33    pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
34        register_histogram_vec(
35            "log_view_hash_runtime",
36            "LogView hash runtime",
37            &[],
38            exponential_bucket_latencies(5.0),
39        )
40    });
41}
42
43/// Key tags to create the sub-keys of a `LogView` on top of the base key.
44#[repr(u8)]
45enum KeyTag {
46    /// Prefix for the storing of the variable `stored_count`.
47    Count = MIN_VIEW_TAG,
48    /// Prefix for the indices of the log.
49    Index,
50}
51
52/// A view that supports logging values of type `T`.
53#[derive(Debug, Allocative)]
54#[allocative(bound = "C, T: Allocative")]
55pub struct LogView<C, T> {
56    /// The view context.
57    #[allocative(skip)]
58    context: C,
59    /// Whether to clear storage before applying updates.
60    delete_storage_first: bool,
61    /// The number of entries persisted in storage.
62    stored_count: u32,
63    /// New values not yet persisted to storage.
64    new_values: Vec<T>,
65}
66
67impl<C, T> View for LogView<C, T>
68where
69    C: Context,
70    T: Send + Sync + Serialize,
71{
72    const NUM_INIT_KEYS: usize = 1;
73
74    type Context = C;
75
76    fn context(&self) -> C {
77        self.context.clone()
78    }
79
80    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
81        Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
82    }
83
84    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
85        let stored_count =
86            from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
87        Ok(Self {
88            context,
89            delete_storage_first: false,
90            stored_count,
91            new_values: Vec::new(),
92        })
93    }
94
95    fn rollback(&mut self) {
96        self.delete_storage_first = false;
97        self.new_values.clear();
98    }
99
100    async fn has_pending_changes(&self) -> bool {
101        if self.delete_storage_first {
102            return true;
103        }
104        !self.new_values.is_empty()
105    }
106
107    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
108        let mut delete_view = false;
109        if self.delete_storage_first {
110            batch.delete_key_prefix(self.context.base_key().bytes.clone());
111            delete_view = true;
112        }
113        if !self.new_values.is_empty() {
114            delete_view = false;
115            let new_values_len =
116                u32::try_from(self.new_values.len()).map_err(|_| ArithmeticError::Overflow)?;
117            let new_count = self
118                .stored_count
119                .checked_add(new_values_len)
120                .ok_or(ArithmeticError::Overflow)?;
121            let mut index = self.stored_count;
122            for value in &self.new_values {
123                let key = self
124                    .context
125                    .base_key()
126                    .derive_tag_key(KeyTag::Index as u8, &index)?;
127                batch.put_key_value(key, value)?;
128                index += 1;
129            }
130            let key = self.context.base_key().base_tag(KeyTag::Count as u8);
131            batch.put_key_value(key, &new_count)?;
132        }
133        Ok(delete_view)
134    }
135
136    fn post_save(&mut self) {
137        if self.delete_storage_first {
138            self.stored_count = 0;
139        }
140        self.stored_count += u32::try_from(self.new_values.len()).expect("verified in pre_save");
141        self.new_values.clear();
142        self.delete_storage_first = false;
143    }
144
145    fn clear(&mut self) {
146        self.delete_storage_first = true;
147        self.new_values.clear();
148    }
149}
150
151impl<C, T> ClonableView for LogView<C, T>
152where
153    C: Context,
154    T: Clone + Send + Sync + Serialize,
155{
156    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
157        Ok(LogView {
158            context: self.context.clone(),
159            delete_storage_first: self.delete_storage_first,
160            stored_count: self.stored_count,
161            new_values: self.new_values.clone(),
162        })
163    }
164}
165
166impl<C, T> LogView<C, T>
167where
168    C: Context,
169{
170    /// Pushes a value to the end of the log.
171    /// ```rust
172    /// # tokio_test::block_on(async {
173    /// # use linera_views::context::MemoryContext;
174    /// # use linera_views::log_view::LogView;
175    /// # use linera_views::views::View;
176    /// # let context = MemoryContext::new_for_testing(());
177    /// let mut log = LogView::load(context).await.unwrap();
178    /// log.push(34);
179    /// # })
180    /// ```
181    pub fn push(&mut self, value: T) {
182        self.new_values.push(value);
183    }
184
185    /// Reads the size of the log.
186    /// ```rust
187    /// # tokio_test::block_on(async {
188    /// # use linera_views::context::MemoryContext;
189    /// # use linera_views::log_view::LogView;
190    /// # use linera_views::views::View;
191    /// # let context = MemoryContext::new_for_testing(());
192    /// let mut log = LogView::load(context).await.unwrap();
193    /// log.push(34);
194    /// log.push(42);
195    /// assert_eq!(log.count(), 2);
196    /// # })
197    /// ```
198    pub fn count(&self) -> usize {
199        if self.delete_storage_first {
200            self.new_values.len()
201        } else {
202            self.stored_count as usize + self.new_values.len()
203        }
204    }
205
206    /// Obtains the extra data.
207    pub fn extra(&self) -> &C::Extra {
208        self.context.extra()
209    }
210}
211
212impl<C, T> LogView<C, T>
213where
214    C: Context,
215    T: Clone + DeserializeOwned + Serialize + Send + Sync,
216{
217    /// Reads the logged value with the given index (including staged ones).
218    /// ```rust
219    /// # tokio_test::block_on(async {
220    /// # use linera_views::context::MemoryContext;
221    /// # use linera_views::log_view::LogView;
222    /// # use linera_views::views::View;
223    /// # let context = MemoryContext::new_for_testing(());
224    /// let mut log = LogView::load(context).await.unwrap();
225    /// log.push(34);
226    /// assert_eq!(log.get(0).await.unwrap(), Some(34));
227    /// # })
228    /// ```
229    pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
230        let stored_count = self.stored_count as usize;
231        let value = if self.delete_storage_first {
232            self.new_values.get(index).cloned()
233        } else if index < stored_count {
234            let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
235            let key = self
236                .context
237                .base_key()
238                .derive_tag_key(KeyTag::Index as u8, &index)?;
239            self.context.store().read_value(&key).await?
240        } else {
241            self.new_values.get(index - stored_count).cloned()
242        };
243        Ok(value)
244    }
245
246    /// Reads several logged keys (including staged ones)
247    /// ```rust
248    /// # tokio_test::block_on(async {
249    /// # use linera_views::context::MemoryContext;
250    /// # use linera_views::log_view::LogView;
251    /// # use linera_views::views::View;
252    /// # let context = MemoryContext::new_for_testing(());
253    /// let mut log = LogView::load(context).await.unwrap();
254    /// log.push(34);
255    /// log.push(42);
256    /// assert_eq!(
257    ///     log.multi_get(vec![0, 1]).await.unwrap(),
258    ///     vec![Some(34), Some(42)]
259    /// );
260    /// # })
261    /// ```
262    pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
263        let mut result = Vec::new();
264        if self.delete_storage_first {
265            for index in indices {
266                result.push(self.new_values.get(index).cloned());
267            }
268        } else {
269            let stored_count = self.stored_count as usize;
270            let mut index_to_positions = BTreeMap::<usize, Vec<usize>>::new();
271            for (pos, index) in indices.into_iter().enumerate() {
272                if index < stored_count {
273                    index_to_positions.entry(index).or_default().push(pos);
274                    result.push(None);
275                } else {
276                    result.push(self.new_values.get(index - stored_count).cloned());
277                }
278            }
279            let mut keys = Vec::new();
280            let mut vec_positions = Vec::new();
281            for (index, positions) in index_to_positions {
282                let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
283                let key = self
284                    .context
285                    .base_key()
286                    .derive_tag_key(KeyTag::Index as u8, &index)?;
287                keys.push(key);
288                vec_positions.push(positions);
289            }
290            let values = self.context.store().read_multi_values(&keys).await?;
291            for (positions, value) in vec_positions.into_iter().zip(values) {
292                if let Some((&last, rest)) = positions.split_last() {
293                    for &position in rest {
294                        *result.get_mut(position).unwrap() = value.clone();
295                    }
296                    *result.get_mut(last).unwrap() = value;
297                }
298            }
299        }
300        Ok(result)
301    }
302
303    /// Reads the index-value pairs at the given positions.
304    /// ```rust
305    /// # tokio_test::block_on(async {
306    /// # use linera_views::context::MemoryContext;
307    /// # use linera_views::log_view::LogView;
308    /// # use linera_views::views::View;
309    /// # let context = MemoryContext::new_for_testing(());
310    /// let mut log = LogView::load(context).await.unwrap();
311    /// log.push(34);
312    /// log.push(42);
313    /// assert_eq!(
314    ///     log.multi_get_pairs(vec![0, 1, 5]).await.unwrap(),
315    ///     vec![(0, Some(34)), (1, Some(42)), (5, None)]
316    /// );
317    /// # })
318    /// ```
319    pub async fn multi_get_pairs(
320        &self,
321        indices: Vec<usize>,
322    ) -> Result<Vec<(usize, Option<T>)>, ViewError> {
323        let values = self.multi_get(indices.clone()).await?;
324        Ok(indices.into_iter().zip(values).collect())
325    }
326
327    async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
328        let count = range.len();
329        let mut keys = Vec::with_capacity(count);
330        for index in range {
331            let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
332            let key = self
333                .context
334                .base_key()
335                .derive_tag_key(KeyTag::Index as u8, &index)?;
336            keys.push(key);
337        }
338        let mut values = Vec::with_capacity(count);
339        for entry in self.context.store().read_multi_values(&keys).await? {
340            match entry {
341                None => {
342                    return Err(ViewError::MissingEntries("LogView".into()));
343                }
344                Some(value) => values.push(value),
345            }
346        }
347        Ok(values)
348    }
349
350    /// Reads the logged values in the given range (including staged ones).
351    /// ```rust
352    /// # tokio_test::block_on(async {
353    /// # use linera_views::context::MemoryContext;
354    /// # use linera_views::log_view::LogView;
355    /// # use linera_views::views::View;
356    /// # let context = MemoryContext::new_for_testing(());
357    /// let mut log = LogView::load(context).await.unwrap();
358    /// log.push(34);
359    /// log.push(42);
360    /// log.push(56);
361    /// assert_eq!(log.read(0..2).await.unwrap(), vec![34, 42]);
362    /// # })
363    /// ```
364    pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
365    where
366        R: RangeBounds<usize>,
367    {
368        let effective_stored_count = if self.delete_storage_first {
369            0
370        } else {
371            self.stored_count as usize
372        };
373        let end = match range.end_bound() {
374            Bound::Included(end) => *end + 1,
375            Bound::Excluded(end) => *end,
376            Bound::Unbounded => self.count(),
377        }
378        .min(self.count());
379        let start = match range.start_bound() {
380            Bound::Included(start) => *start,
381            Bound::Excluded(start) => *start + 1,
382            Bound::Unbounded => 0,
383        };
384        if start >= end {
385            return Ok(Vec::new());
386        }
387        if start < effective_stored_count {
388            if end <= effective_stored_count {
389                self.read_context(start..end).await
390            } else {
391                let mut values = self.read_context(start..effective_stored_count).await?;
392                values.extend(
393                    self.new_values[0..(end - effective_stored_count)]
394                        .iter()
395                        .cloned(),
396                );
397                Ok(values)
398            }
399        } else {
400            Ok(
401                self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
402                    .to_vec(),
403            )
404        }
405    }
406}
407
408impl<C, T> HashableView for LogView<C, T>
409where
410    C: Context,
411    T: Send + Sync + Clone + Serialize + DeserializeOwned,
412{
413    type Hasher = sha3::Sha3_256;
414
415    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
416        self.hash().await
417    }
418
419    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
420        #[cfg(with_metrics)]
421        let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
422        let elements = self.read(..).await?;
423        let mut hasher = sha3::Sha3_256::default();
424        hasher.update_with_bcs_bytes(&elements)?;
425        Ok(hasher.finalize())
426    }
427}
428
429/// Type wrapping `LogView` while memoizing the hash.
430pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
431
432/// Wrapper around `LogView` to compute hashes based on the history of changes.
433pub type HistoricallyHashedLogView<C, T> = HistoricallyHashableView<C, LogView<C, T>>;
434
435#[cfg(not(web))]
436mod graphql {
437    use std::borrow::Cow;
438
439    use linera_base::data_types::ArithmeticError;
440
441    use super::LogView;
442    use crate::{
443        context::Context,
444        graphql::{hash_name, mangle},
445    };
446
447    impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
448        fn type_name() -> Cow<'static, str> {
449            format!(
450                "LogView_{}_{:08x}",
451                mangle(T::type_name()),
452                hash_name::<T>()
453            )
454            .into()
455        }
456    }
457
458    #[async_graphql::Object(cache_control(no_cache), name_type)]
459    impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
460    where
461        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
462    {
463        #[graphql(derived(name = "count"))]
464        async fn count_(&self) -> Result<u32, async_graphql::Error> {
465            Ok(u32::try_from(self.count()).map_err(|_| ArithmeticError::Overflow)?)
466        }
467
468        async fn entries(
469            &self,
470            start: Option<usize>,
471            end: Option<usize>,
472        ) -> async_graphql::Result<Vec<T>> {
473            Ok(self
474                .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
475                .await?)
476        }
477    }
478}