linera_views/views/
log_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    ops::{Bound, Range, RangeBounds},
7};
8
9#[cfg(with_metrics)]
10use linera_base::prometheus_util::MeasureLatency as _;
11use serde::{de::DeserializeOwned, Serialize};
12
13use crate::{
14    batch::Batch,
15    common::{from_bytes_option_or_default, HasherOutput},
16    context::Context,
17    hashable_wrapper::WrappedHashableContainerView,
18    historical_hash_wrapper::HistoricallyHashableView,
19    store::ReadableKeyValueStore as _,
20    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
21};
22
23#[cfg(with_metrics)]
24mod metrics {
25    use std::sync::LazyLock;
26
27    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
28    use prometheus::HistogramVec;
29
30    /// The runtime of hash computation
31    pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
32        register_histogram_vec(
33            "log_view_hash_runtime",
34            "LogView hash runtime",
35            &[],
36            exponential_bucket_latencies(5.0),
37        )
38    });
39}
40
41/// Key tags to create the sub-keys of a `LogView` on top of the base key.
42#[repr(u8)]
43enum KeyTag {
44    /// Prefix for the storing of the variable `stored_count`.
45    Count = MIN_VIEW_TAG,
46    /// Prefix for the indices of the log.
47    Index,
48}
49
50/// A view that supports logging values of type `T`.
51#[derive(Debug)]
52pub struct LogView<C, T> {
53    context: C,
54    delete_storage_first: bool,
55    stored_count: usize,
56    new_values: Vec<T>,
57}
58
59impl<C, T> View for LogView<C, T>
60where
61    C: Context,
62    T: Send + Sync + Serialize,
63{
64    const NUM_INIT_KEYS: usize = 1;
65
66    type Context = C;
67
68    fn context(&self) -> &C {
69        &self.context
70    }
71
72    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
73        Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
74    }
75
76    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
77        let stored_count =
78            from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
79        Ok(Self {
80            context,
81            delete_storage_first: false,
82            stored_count,
83            new_values: Vec::new(),
84        })
85    }
86
87    fn rollback(&mut self) {
88        self.delete_storage_first = false;
89        self.new_values.clear();
90    }
91
92    async fn has_pending_changes(&self) -> bool {
93        if self.delete_storage_first {
94            return true;
95        }
96        !self.new_values.is_empty()
97    }
98
99    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
100        let mut delete_view = false;
101        if self.delete_storage_first {
102            batch.delete_key_prefix(self.context.base_key().bytes.clone());
103            self.stored_count = 0;
104            delete_view = true;
105        }
106        if !self.new_values.is_empty() {
107            delete_view = false;
108            for value in &self.new_values {
109                let key = self
110                    .context
111                    .base_key()
112                    .derive_tag_key(KeyTag::Index as u8, &self.stored_count)?;
113                batch.put_key_value(key, value)?;
114                self.stored_count += 1;
115            }
116            let key = self.context.base_key().base_tag(KeyTag::Count as u8);
117            batch.put_key_value(key, &self.stored_count)?;
118            self.new_values.clear();
119        }
120        self.delete_storage_first = false;
121        Ok(delete_view)
122    }
123
124    fn clear(&mut self) {
125        self.delete_storage_first = true;
126        self.new_values.clear();
127    }
128}
129
130impl<C, T> ClonableView for LogView<C, T>
131where
132    C: Context,
133    T: Clone + Send + Sync + Serialize,
134{
135    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
136        Ok(LogView {
137            context: self.context.clone(),
138            delete_storage_first: self.delete_storage_first,
139            stored_count: self.stored_count,
140            new_values: self.new_values.clone(),
141        })
142    }
143}
144
145impl<C, T> LogView<C, T>
146where
147    C: Context,
148{
149    /// Pushes a value to the end of the log.
150    /// ```rust
151    /// # tokio_test::block_on(async {
152    /// # use linera_views::context::MemoryContext;
153    /// # use linera_views::log_view::LogView;
154    /// # use linera_views::views::View;
155    /// # let context = MemoryContext::new_for_testing(());
156    /// let mut log = LogView::load(context).await.unwrap();
157    /// log.push(34);
158    /// # })
159    /// ```
160    pub fn push(&mut self, value: T) {
161        self.new_values.push(value);
162    }
163
164    /// Reads the size of the log.
165    /// ```rust
166    /// # tokio_test::block_on(async {
167    /// # use linera_views::context::MemoryContext;
168    /// # use linera_views::log_view::LogView;
169    /// # use linera_views::views::View;
170    /// # let context = MemoryContext::new_for_testing(());
171    /// let mut log = LogView::load(context).await.unwrap();
172    /// log.push(34);
173    /// log.push(42);
174    /// assert_eq!(log.count(), 2);
175    /// # })
176    /// ```
177    pub fn count(&self) -> usize {
178        if self.delete_storage_first {
179            self.new_values.len()
180        } else {
181            self.stored_count + self.new_values.len()
182        }
183    }
184
185    /// Obtains the extra data.
186    pub fn extra(&self) -> &C::Extra {
187        self.context.extra()
188    }
189}
190
191impl<C, T> LogView<C, T>
192where
193    C: Context,
194    T: Clone + DeserializeOwned + Serialize + Send + Sync,
195{
196    /// Reads the logged value with the given index (including staged ones).
197    /// ```rust
198    /// # tokio_test::block_on(async {
199    /// # use linera_views::context::MemoryContext;
200    /// # use linera_views::log_view::LogView;
201    /// # use linera_views::views::View;
202    /// # let context = MemoryContext::new_for_testing(());
203    /// let mut log = LogView::load(context).await.unwrap();
204    /// log.push(34);
205    /// assert_eq!(log.get(0).await.unwrap(), Some(34));
206    /// # })
207    /// ```
208    pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
209        let value = if self.delete_storage_first {
210            self.new_values.get(index).cloned()
211        } else if index < self.stored_count {
212            let key = self
213                .context
214                .base_key()
215                .derive_tag_key(KeyTag::Index as u8, &index)?;
216            self.context.store().read_value(&key).await?
217        } else {
218            self.new_values.get(index - self.stored_count).cloned()
219        };
220        Ok(value)
221    }
222
223    /// Reads several logged keys (including staged ones)
224    /// ```rust
225    /// # tokio_test::block_on(async {
226    /// # use linera_views::context::MemoryContext;
227    /// # use linera_views::log_view::LogView;
228    /// # use linera_views::views::View;
229    /// # let context = MemoryContext::new_for_testing(());
230    /// let mut log = LogView::load(context).await.unwrap();
231    /// log.push(34);
232    /// log.push(42);
233    /// assert_eq!(
234    ///     log.multi_get(vec![0, 1]).await.unwrap(),
235    ///     vec![Some(34), Some(42)]
236    /// );
237    /// # })
238    /// ```
239    pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
240        let mut result = Vec::new();
241        if self.delete_storage_first {
242            for index in indices {
243                result.push(self.new_values.get(index).cloned());
244            }
245        } else {
246            let mut index_to_positions = BTreeMap::<usize, Vec<usize>>::new();
247            for (pos, index) in indices.into_iter().enumerate() {
248                if index < self.stored_count {
249                    index_to_positions.entry(index).or_default().push(pos);
250                    result.push(None);
251                } else {
252                    result.push(self.new_values.get(index - self.stored_count).cloned());
253                }
254            }
255            let mut keys = Vec::new();
256            let mut vec_positions = Vec::new();
257            for (index, positions) in index_to_positions {
258                let key = self
259                    .context
260                    .base_key()
261                    .derive_tag_key(KeyTag::Index as u8, &index)?;
262                keys.push(key);
263                vec_positions.push(positions);
264            }
265            let values = self.context.store().read_multi_values(keys).await?;
266            for (positions, value) in vec_positions.into_iter().zip(values) {
267                if let Some((&last, rest)) = positions.split_last() {
268                    for &position in rest {
269                        *result.get_mut(position).unwrap() = value.clone();
270                    }
271                    *result.get_mut(last).unwrap() = value;
272                }
273            }
274        }
275        Ok(result)
276    }
277
278    /// Reads the index-value pairs at the given positions.
279    /// ```rust
280    /// # tokio_test::block_on(async {
281    /// # use linera_views::context::MemoryContext;
282    /// # use linera_views::log_view::LogView;
283    /// # use linera_views::views::View;
284    /// # let context = MemoryContext::new_for_testing(());
285    /// let mut log = LogView::load(context).await.unwrap();
286    /// log.push(34);
287    /// log.push(42);
288    /// assert_eq!(
289    ///     log.multi_get_pairs(vec![0, 1, 5]).await.unwrap(),
290    ///     vec![(0, Some(34)), (1, Some(42)), (5, None)]
291    /// );
292    /// # })
293    /// ```
294    pub async fn multi_get_pairs(
295        &self,
296        indices: Vec<usize>,
297    ) -> Result<Vec<(usize, Option<T>)>, ViewError> {
298        let values = self.multi_get(indices.clone()).await?;
299        Ok(indices.into_iter().zip(values).collect())
300    }
301
302    async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
303        let count = range.len();
304        let mut keys = Vec::with_capacity(count);
305        for index in range {
306            let key = self
307                .context
308                .base_key()
309                .derive_tag_key(KeyTag::Index as u8, &index)?;
310            keys.push(key);
311        }
312        let mut values = Vec::with_capacity(count);
313        for entry in self.context.store().read_multi_values(keys).await? {
314            match entry {
315                None => {
316                    let root_key = self.context.store().root_key()?;
317                    return Err(ViewError::MissingEntries(root_key));
318                }
319                Some(value) => values.push(value),
320            }
321        }
322        Ok(values)
323    }
324
325    /// Reads the logged values in the given range (including staged ones).
326    /// ```rust
327    /// # tokio_test::block_on(async {
328    /// # use linera_views::context::MemoryContext;
329    /// # use linera_views::log_view::LogView;
330    /// # use linera_views::views::View;
331    /// # let context = MemoryContext::new_for_testing(());
332    /// let mut log = LogView::load(context).await.unwrap();
333    /// log.push(34);
334    /// log.push(42);
335    /// log.push(56);
336    /// assert_eq!(log.read(0..2).await.unwrap(), vec![34, 42]);
337    /// # })
338    /// ```
339    pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
340    where
341        R: RangeBounds<usize>,
342    {
343        let effective_stored_count = if self.delete_storage_first {
344            0
345        } else {
346            self.stored_count
347        };
348        let end = match range.end_bound() {
349            Bound::Included(end) => *end + 1,
350            Bound::Excluded(end) => *end,
351            Bound::Unbounded => self.count(),
352        }
353        .min(self.count());
354        let start = match range.start_bound() {
355            Bound::Included(start) => *start,
356            Bound::Excluded(start) => *start + 1,
357            Bound::Unbounded => 0,
358        };
359        if start >= end {
360            return Ok(Vec::new());
361        }
362        if start < effective_stored_count {
363            if end <= effective_stored_count {
364                self.read_context(start..end).await
365            } else {
366                let mut values = self.read_context(start..effective_stored_count).await?;
367                values.extend(
368                    self.new_values[0..(end - effective_stored_count)]
369                        .iter()
370                        .cloned(),
371                );
372                Ok(values)
373            }
374        } else {
375            Ok(
376                self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
377                    .to_vec(),
378            )
379        }
380    }
381}
382
383impl<C, T> HashableView for LogView<C, T>
384where
385    C: Context,
386    T: Send + Sync + Clone + Serialize + DeserializeOwned,
387{
388    type Hasher = sha3::Sha3_256;
389
390    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
391        self.hash().await
392    }
393
394    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
395        #[cfg(with_metrics)]
396        let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
397        let elements = self.read(..).await?;
398        let mut hasher = sha3::Sha3_256::default();
399        hasher.update_with_bcs_bytes(&elements)?;
400        Ok(hasher.finalize())
401    }
402}
403
404/// Type wrapping `LogView` while memoizing the hash.
405pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
406
407/// Wrapper around `LogView` to compute hashes based on the history of changes.
408pub type HistoricallyHashedLogView<C, T> = HistoricallyHashableView<C, LogView<C, T>>;
409
410#[cfg(not(web))]
411mod graphql {
412    use std::borrow::Cow;
413
414    use super::LogView;
415    use crate::{
416        context::Context,
417        graphql::{hash_name, mangle},
418    };
419
420    impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
421        fn type_name() -> Cow<'static, str> {
422            format!(
423                "LogView_{}_{:08x}",
424                mangle(T::type_name()),
425                hash_name::<T>()
426            )
427            .into()
428        }
429    }
430
431    #[async_graphql::Object(cache_control(no_cache), name_type)]
432    impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
433    where
434        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
435    {
436        #[graphql(derived(name = "count"))]
437        async fn count_(&self) -> Result<u32, async_graphql::Error> {
438            Ok(self.count() as u32)
439        }
440
441        async fn entries(
442            &self,
443            start: Option<usize>,
444            end: Option<usize>,
445        ) -> async_graphql::Result<Vec<T>> {
446            Ok(self
447                .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
448                .await?)
449        }
450    }
451}