Skip to main content

linera_views/views/
log_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    ops::{Bound, Range, RangeBounds},
7};
8
9use allocative::Allocative;
10use linera_base::data_types::ArithmeticError;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use serde::{de::DeserializeOwned, Serialize};
14
15use crate::{
16    batch::Batch,
17    common::{from_bytes_option_or_default, HasherOutput},
18    context::Context,
19    hashable_wrapper::WrappedHashableContainerView,
20    historical_hash_wrapper::HistoricallyHashableView,
21    store::ReadableKeyValueStore as _,
22    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
23};
24
25#[cfg(with_metrics)]
26mod metrics {
27    use std::sync::LazyLock;
28
29    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
30    use prometheus::HistogramVec;
31
32    /// The runtime of hash computation
33    pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
34        register_histogram_vec(
35            "log_view_hash_runtime",
36            "LogView hash runtime",
37            &[],
38            exponential_bucket_latencies(5.0),
39        )
40    });
41}
42
43/// Key tags to create the sub-keys of a `LogView` on top of the base key.
44#[repr(u8)]
45enum KeyTag {
46    /// Prefix for the storing of the variable `stored_count`.
47    Count = MIN_VIEW_TAG,
48    /// Prefix for the indices of the log.
49    Index,
50}
51
52/// A view that supports logging values of type `T`.
53#[derive(Debug, Allocative)]
54#[allocative(bound = "C, T: Allocative")]
55pub struct LogView<C, T> {
56    /// The view context.
57    #[allocative(skip)]
58    context: C,
59    /// Whether to clear storage before applying updates.
60    delete_storage_first: bool,
61    /// The number of entries persisted in storage.
62    stored_count: u32,
63    /// New values not yet persisted to storage.
64    new_values: Vec<T>,
65}
66
67impl<C, T> View for LogView<C, T>
68where
69    C: Context,
70    T: Send + Sync + Serialize,
71{
72    const NUM_INIT_KEYS: usize = 1;
73
74    type Context = C;
75
76    fn context(&self) -> C {
77        self.context.clone()
78    }
79
80    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
81        Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
82    }
83
84    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
85        let stored_count =
86            from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
87        Ok(Self {
88            context,
89            delete_storage_first: false,
90            stored_count,
91            new_values: Vec::new(),
92        })
93    }
94
95    fn rollback(&mut self) {
96        self.delete_storage_first = false;
97        self.new_values.clear();
98    }
99
100    async fn has_pending_changes(&self) -> bool {
101        if self.delete_storage_first {
102            return true;
103        }
104        !self.new_values.is_empty()
105    }
106
107    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
108        let mut delete_view = false;
109        if self.delete_storage_first {
110            batch.delete_key_prefix(self.context.base_key().bytes.clone());
111            delete_view = true;
112        }
113        if !self.new_values.is_empty() {
114            delete_view = false;
115            let new_values_len =
116                u32::try_from(self.new_values.len()).map_err(|_| ArithmeticError::Overflow)?;
117            let new_count = self
118                .stored_count
119                .checked_add(new_values_len)
120                .ok_or(ArithmeticError::Overflow)?;
121            for (index, value) in (self.stored_count..).zip(&self.new_values) {
122                let key = self
123                    .context
124                    .base_key()
125                    .derive_tag_key(KeyTag::Index as u8, &index)?;
126                batch.put_key_value(key, value)?;
127            }
128            let key = self.context.base_key().base_tag(KeyTag::Count as u8);
129            batch.put_key_value(key, &new_count)?;
130        }
131        Ok(delete_view)
132    }
133
134    fn post_save(&mut self) {
135        if self.delete_storage_first {
136            self.stored_count = 0;
137        }
138        self.stored_count += u32::try_from(self.new_values.len()).expect("verified in pre_save");
139        self.new_values.clear();
140        self.delete_storage_first = false;
141    }
142
143    fn clear(&mut self) {
144        self.delete_storage_first = true;
145        self.new_values.clear();
146    }
147}
148
149impl<C, T> ClonableView for LogView<C, T>
150where
151    C: Context,
152    T: Clone + Send + Sync + Serialize,
153{
154    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
155        Ok(LogView {
156            context: self.context.clone(),
157            delete_storage_first: self.delete_storage_first,
158            stored_count: self.stored_count,
159            new_values: self.new_values.clone(),
160        })
161    }
162}
163
164impl<C, T> LogView<C, T>
165where
166    C: Context,
167{
168    /// Pushes a value to the end of the log.
169    /// ```rust
170    /// # tokio_test::block_on(async {
171    /// # use linera_views::context::MemoryContext;
172    /// # use linera_views::log_view::LogView;
173    /// # use linera_views::views::View;
174    /// # let context = MemoryContext::new_for_testing(());
175    /// let mut log = LogView::load(context).await.unwrap();
176    /// log.push(34);
177    /// # })
178    /// ```
179    pub fn push(&mut self, value: T) {
180        self.new_values.push(value);
181    }
182
183    /// Reads the size of the log.
184    /// ```rust
185    /// # tokio_test::block_on(async {
186    /// # use linera_views::context::MemoryContext;
187    /// # use linera_views::log_view::LogView;
188    /// # use linera_views::views::View;
189    /// # let context = MemoryContext::new_for_testing(());
190    /// let mut log = LogView::load(context).await.unwrap();
191    /// log.push(34);
192    /// log.push(42);
193    /// assert_eq!(log.count(), 2);
194    /// # })
195    /// ```
196    pub fn count(&self) -> usize {
197        if self.delete_storage_first {
198            self.new_values.len()
199        } else {
200            self.stored_count as usize + self.new_values.len()
201        }
202    }
203
204    /// Obtains the extra data.
205    pub fn extra(&self) -> &C::Extra {
206        self.context.extra()
207    }
208}
209
210impl<C, T> LogView<C, T>
211where
212    C: Context,
213    T: Clone + DeserializeOwned + Serialize + Send + Sync,
214{
215    /// Reads the logged value with the given index (including staged ones).
216    /// ```rust
217    /// # tokio_test::block_on(async {
218    /// # use linera_views::context::MemoryContext;
219    /// # use linera_views::log_view::LogView;
220    /// # use linera_views::views::View;
221    /// # let context = MemoryContext::new_for_testing(());
222    /// let mut log = LogView::load(context).await.unwrap();
223    /// log.push(34);
224    /// assert_eq!(log.get(0).await.unwrap(), Some(34));
225    /// # })
226    /// ```
227    pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
228        let stored_count = self.stored_count as usize;
229        let value = if self.delete_storage_first {
230            self.new_values.get(index).cloned()
231        } else if index < stored_count {
232            let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
233            let key = self
234                .context
235                .base_key()
236                .derive_tag_key(KeyTag::Index as u8, &index)?;
237            self.context.store().read_value(&key).await?
238        } else {
239            self.new_values.get(index - stored_count).cloned()
240        };
241        Ok(value)
242    }
243
244    /// Reads several logged keys (including staged ones)
245    /// ```rust
246    /// # tokio_test::block_on(async {
247    /// # use linera_views::context::MemoryContext;
248    /// # use linera_views::log_view::LogView;
249    /// # use linera_views::views::View;
250    /// # let context = MemoryContext::new_for_testing(());
251    /// let mut log = LogView::load(context).await.unwrap();
252    /// log.push(34);
253    /// log.push(42);
254    /// assert_eq!(
255    ///     log.multi_get(vec![0, 1]).await.unwrap(),
256    ///     vec![Some(34), Some(42)]
257    /// );
258    /// # })
259    /// ```
260    pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
261        let mut result = Vec::new();
262        if self.delete_storage_first {
263            for index in indices {
264                result.push(self.new_values.get(index).cloned());
265            }
266        } else {
267            let stored_count = self.stored_count as usize;
268            let mut index_to_positions = BTreeMap::<usize, Vec<usize>>::new();
269            for (pos, index) in indices.into_iter().enumerate() {
270                if index < stored_count {
271                    index_to_positions.entry(index).or_default().push(pos);
272                    result.push(None);
273                } else {
274                    result.push(self.new_values.get(index - stored_count).cloned());
275                }
276            }
277            let mut keys = Vec::new();
278            let mut vec_positions = Vec::new();
279            for (index, positions) in index_to_positions {
280                let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
281                let key = self
282                    .context
283                    .base_key()
284                    .derive_tag_key(KeyTag::Index as u8, &index)?;
285                keys.push(key);
286                vec_positions.push(positions);
287            }
288            let values = self.context.store().read_multi_values(&keys).await?;
289            for (positions, value) in vec_positions.into_iter().zip(values) {
290                if let Some((&last, rest)) = positions.split_last() {
291                    for &position in rest {
292                        *result.get_mut(position).unwrap() = value.clone();
293                    }
294                    *result.get_mut(last).unwrap() = value;
295                }
296            }
297        }
298        Ok(result)
299    }
300
301    /// Reads the index-value pairs at the given positions.
302    /// ```rust
303    /// # tokio_test::block_on(async {
304    /// # use linera_views::context::MemoryContext;
305    /// # use linera_views::log_view::LogView;
306    /// # use linera_views::views::View;
307    /// # let context = MemoryContext::new_for_testing(());
308    /// let mut log = LogView::load(context).await.unwrap();
309    /// log.push(34);
310    /// log.push(42);
311    /// assert_eq!(
312    ///     log.multi_get_pairs(vec![0, 1, 5]).await.unwrap(),
313    ///     vec![(0, Some(34)), (1, Some(42)), (5, None)]
314    /// );
315    /// # })
316    /// ```
317    pub async fn multi_get_pairs(
318        &self,
319        indices: Vec<usize>,
320    ) -> Result<Vec<(usize, Option<T>)>, ViewError> {
321        let values = self.multi_get(indices.clone()).await?;
322        Ok(indices.into_iter().zip(values).collect())
323    }
324
325    async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
326        let count = range.len();
327        let mut keys = Vec::with_capacity(count);
328        for index in range {
329            let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
330            let key = self
331                .context
332                .base_key()
333                .derive_tag_key(KeyTag::Index as u8, &index)?;
334            keys.push(key);
335        }
336        let mut values = Vec::with_capacity(count);
337        for entry in self.context.store().read_multi_values(&keys).await? {
338            match entry {
339                None => {
340                    return Err(ViewError::MissingEntries("LogView".into()));
341                }
342                Some(value) => values.push(value),
343            }
344        }
345        Ok(values)
346    }
347
348    /// Reads the logged values in the given range (including staged ones).
349    /// ```rust
350    /// # tokio_test::block_on(async {
351    /// # use linera_views::context::MemoryContext;
352    /// # use linera_views::log_view::LogView;
353    /// # use linera_views::views::View;
354    /// # let context = MemoryContext::new_for_testing(());
355    /// let mut log = LogView::load(context).await.unwrap();
356    /// log.push(34);
357    /// log.push(42);
358    /// log.push(56);
359    /// assert_eq!(log.read(0..2).await.unwrap(), vec![34, 42]);
360    /// # })
361    /// ```
362    pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
363    where
364        R: RangeBounds<usize>,
365    {
366        let effective_stored_count = if self.delete_storage_first {
367            0
368        } else {
369            self.stored_count as usize
370        };
371        let end = match range.end_bound() {
372            Bound::Included(end) => *end + 1,
373            Bound::Excluded(end) => *end,
374            Bound::Unbounded => self.count(),
375        }
376        .min(self.count());
377        let start = match range.start_bound() {
378            Bound::Included(start) => *start,
379            Bound::Excluded(start) => *start + 1,
380            Bound::Unbounded => 0,
381        };
382        if start >= end {
383            return Ok(Vec::new());
384        }
385        if start < effective_stored_count {
386            if end <= effective_stored_count {
387                self.read_context(start..end).await
388            } else {
389                let mut values = self.read_context(start..effective_stored_count).await?;
390                values.extend(
391                    self.new_values[0..(end - effective_stored_count)]
392                        .iter()
393                        .cloned(),
394                );
395                Ok(values)
396            }
397        } else {
398            Ok(
399                self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
400                    .to_vec(),
401            )
402        }
403    }
404}
405
406impl<C, T> HashableView for LogView<C, T>
407where
408    C: Context,
409    T: Send + Sync + Clone + Serialize + DeserializeOwned,
410{
411    type Hasher = sha3::Sha3_256;
412
413    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
414        self.hash().await
415    }
416
417    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
418        #[cfg(with_metrics)]
419        let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
420        let elements = self.read(..).await?;
421        let mut hasher = sha3::Sha3_256::default();
422        hasher.update_with_bcs_bytes(&elements)?;
423        Ok(hasher.finalize())
424    }
425}
426
427/// Type wrapping `LogView` while memoizing the hash.
428pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
429
430/// Wrapper around `LogView` to compute hashes based on the history of changes.
431pub type HistoricallyHashedLogView<C, T> = HistoricallyHashableView<C, LogView<C, T>>;
432
433#[cfg(not(web))]
434mod graphql {
435    use std::borrow::Cow;
436
437    use linera_base::data_types::ArithmeticError;
438
439    use super::LogView;
440    use crate::{
441        context::Context,
442        graphql::{hash_name, mangle},
443    };
444
445    impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
446        fn type_name() -> Cow<'static, str> {
447            format!(
448                "LogView_{}_{:08x}",
449                mangle(T::type_name()),
450                hash_name::<T>()
451            )
452            .into()
453        }
454    }
455
456    #[async_graphql::Object(cache_control(no_cache), name_type)]
457    impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
458    where
459        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
460    {
461        #[graphql(derived(name = "count"))]
462        async fn count_(&self) -> Result<u32, async_graphql::Error> {
463            Ok(u32::try_from(self.count()).map_err(|_| ArithmeticError::Overflow)?)
464        }
465
466        async fn entries(
467            &self,
468            start: Option<usize>,
469            end: Option<usize>,
470        ) -> async_graphql::Result<Vec<T>> {
471            Ok(self
472                .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
473                .await?)
474        }
475    }
476}