linera_views/views/
log_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    ops::{Bound, Range, RangeBounds},
7};
8
9use allocative::Allocative;
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use serde::{de::DeserializeOwned, Serialize};
13
14use crate::{
15    batch::Batch,
16    common::{from_bytes_option_or_default, HasherOutput},
17    context::Context,
18    hashable_wrapper::WrappedHashableContainerView,
19    historical_hash_wrapper::HistoricallyHashableView,
20    store::ReadableKeyValueStore as _,
21    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
22};
23
24#[cfg(with_metrics)]
25mod metrics {
26    use std::sync::LazyLock;
27
28    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
29    use prometheus::HistogramVec;
30
31    /// The runtime of hash computation
32    pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
33        register_histogram_vec(
34            "log_view_hash_runtime",
35            "LogView hash runtime",
36            &[],
37            exponential_bucket_latencies(5.0),
38        )
39    });
40}
41
42/// Key tags to create the sub-keys of a `LogView` on top of the base key.
43#[repr(u8)]
44enum KeyTag {
45    /// Prefix for the storing of the variable `stored_count`.
46    Count = MIN_VIEW_TAG,
47    /// Prefix for the indices of the log.
48    Index,
49}
50
51/// A view that supports logging values of type `T`.
52#[derive(Debug, Allocative)]
53#[allocative(bound = "C, T: Allocative")]
54pub struct LogView<C, T> {
55    /// The view context.
56    #[allocative(skip)]
57    context: C,
58    /// Whether to clear storage before applying updates.
59    delete_storage_first: bool,
60    /// The number of entries persisted in storage.
61    stored_count: usize,
62    /// New values not yet persisted to storage.
63    new_values: Vec<T>,
64}
65
66impl<C, T> View for LogView<C, T>
67where
68    C: Context,
69    T: Send + Sync + Serialize,
70{
71    const NUM_INIT_KEYS: usize = 1;
72
73    type Context = C;
74
75    fn context(&self) -> C {
76        self.context.clone()
77    }
78
79    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
80        Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
81    }
82
83    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
84        let stored_count =
85            from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
86        Ok(Self {
87            context,
88            delete_storage_first: false,
89            stored_count,
90            new_values: Vec::new(),
91        })
92    }
93
94    fn rollback(&mut self) {
95        self.delete_storage_first = false;
96        self.new_values.clear();
97    }
98
99    async fn has_pending_changes(&self) -> bool {
100        if self.delete_storage_first {
101            return true;
102        }
103        !self.new_values.is_empty()
104    }
105
106    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
107        let mut delete_view = false;
108        if self.delete_storage_first {
109            batch.delete_key_prefix(self.context.base_key().bytes.clone());
110            delete_view = true;
111        }
112        if !self.new_values.is_empty() {
113            delete_view = false;
114            let mut count = self.stored_count;
115            for value in &self.new_values {
116                let key = self
117                    .context
118                    .base_key()
119                    .derive_tag_key(KeyTag::Index as u8, &count)?;
120                batch.put_key_value(key, value)?;
121                count += 1;
122            }
123            let key = self.context.base_key().base_tag(KeyTag::Count as u8);
124            batch.put_key_value(key, &count)?;
125        }
126        Ok(delete_view)
127    }
128
129    fn post_save(&mut self) {
130        if self.delete_storage_first {
131            self.stored_count = 0;
132        }
133        self.stored_count += self.new_values.len();
134        self.new_values.clear();
135        self.delete_storage_first = false;
136    }
137
138    fn clear(&mut self) {
139        self.delete_storage_first = true;
140        self.new_values.clear();
141    }
142}
143
144impl<C, T> ClonableView for LogView<C, T>
145where
146    C: Context,
147    T: Clone + Send + Sync + Serialize,
148{
149    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
150        Ok(LogView {
151            context: self.context.clone(),
152            delete_storage_first: self.delete_storage_first,
153            stored_count: self.stored_count,
154            new_values: self.new_values.clone(),
155        })
156    }
157}
158
159impl<C, T> LogView<C, T>
160where
161    C: Context,
162{
163    /// Pushes a value to the end of the log.
164    /// ```rust
165    /// # tokio_test::block_on(async {
166    /// # use linera_views::context::MemoryContext;
167    /// # use linera_views::log_view::LogView;
168    /// # use linera_views::views::View;
169    /// # let context = MemoryContext::new_for_testing(());
170    /// let mut log = LogView::load(context).await.unwrap();
171    /// log.push(34);
172    /// # })
173    /// ```
174    pub fn push(&mut self, value: T) {
175        self.new_values.push(value);
176    }
177
178    /// Reads the size of the log.
179    /// ```rust
180    /// # tokio_test::block_on(async {
181    /// # use linera_views::context::MemoryContext;
182    /// # use linera_views::log_view::LogView;
183    /// # use linera_views::views::View;
184    /// # let context = MemoryContext::new_for_testing(());
185    /// let mut log = LogView::load(context).await.unwrap();
186    /// log.push(34);
187    /// log.push(42);
188    /// assert_eq!(log.count(), 2);
189    /// # })
190    /// ```
191    pub fn count(&self) -> usize {
192        if self.delete_storage_first {
193            self.new_values.len()
194        } else {
195            self.stored_count + self.new_values.len()
196        }
197    }
198
199    /// Obtains the extra data.
200    pub fn extra(&self) -> &C::Extra {
201        self.context.extra()
202    }
203}
204
205impl<C, T> LogView<C, T>
206where
207    C: Context,
208    T: Clone + DeserializeOwned + Serialize + Send + Sync,
209{
210    /// Reads the logged value with the given index (including staged ones).
211    /// ```rust
212    /// # tokio_test::block_on(async {
213    /// # use linera_views::context::MemoryContext;
214    /// # use linera_views::log_view::LogView;
215    /// # use linera_views::views::View;
216    /// # let context = MemoryContext::new_for_testing(());
217    /// let mut log = LogView::load(context).await.unwrap();
218    /// log.push(34);
219    /// assert_eq!(log.get(0).await.unwrap(), Some(34));
220    /// # })
221    /// ```
222    pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
223        let value = if self.delete_storage_first {
224            self.new_values.get(index).cloned()
225        } else if index < self.stored_count {
226            let key = self
227                .context
228                .base_key()
229                .derive_tag_key(KeyTag::Index as u8, &index)?;
230            self.context.store().read_value(&key).await?
231        } else {
232            self.new_values.get(index - self.stored_count).cloned()
233        };
234        Ok(value)
235    }
236
237    /// Reads several logged keys (including staged ones)
238    /// ```rust
239    /// # tokio_test::block_on(async {
240    /// # use linera_views::context::MemoryContext;
241    /// # use linera_views::log_view::LogView;
242    /// # use linera_views::views::View;
243    /// # let context = MemoryContext::new_for_testing(());
244    /// let mut log = LogView::load(context).await.unwrap();
245    /// log.push(34);
246    /// log.push(42);
247    /// assert_eq!(
248    ///     log.multi_get(vec![0, 1]).await.unwrap(),
249    ///     vec![Some(34), Some(42)]
250    /// );
251    /// # })
252    /// ```
253    pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
254        let mut result = Vec::new();
255        if self.delete_storage_first {
256            for index in indices {
257                result.push(self.new_values.get(index).cloned());
258            }
259        } else {
260            let mut index_to_positions = BTreeMap::<usize, Vec<usize>>::new();
261            for (pos, index) in indices.into_iter().enumerate() {
262                if index < self.stored_count {
263                    index_to_positions.entry(index).or_default().push(pos);
264                    result.push(None);
265                } else {
266                    result.push(self.new_values.get(index - self.stored_count).cloned());
267                }
268            }
269            let mut keys = Vec::new();
270            let mut vec_positions = Vec::new();
271            for (index, positions) in index_to_positions {
272                let key = self
273                    .context
274                    .base_key()
275                    .derive_tag_key(KeyTag::Index as u8, &index)?;
276                keys.push(key);
277                vec_positions.push(positions);
278            }
279            let values = self.context.store().read_multi_values(&keys).await?;
280            for (positions, value) in vec_positions.into_iter().zip(values) {
281                if let Some((&last, rest)) = positions.split_last() {
282                    for &position in rest {
283                        *result.get_mut(position).unwrap() = value.clone();
284                    }
285                    *result.get_mut(last).unwrap() = value;
286                }
287            }
288        }
289        Ok(result)
290    }
291
292    /// Reads the index-value pairs at the given positions.
293    /// ```rust
294    /// # tokio_test::block_on(async {
295    /// # use linera_views::context::MemoryContext;
296    /// # use linera_views::log_view::LogView;
297    /// # use linera_views::views::View;
298    /// # let context = MemoryContext::new_for_testing(());
299    /// let mut log = LogView::load(context).await.unwrap();
300    /// log.push(34);
301    /// log.push(42);
302    /// assert_eq!(
303    ///     log.multi_get_pairs(vec![0, 1, 5]).await.unwrap(),
304    ///     vec![(0, Some(34)), (1, Some(42)), (5, None)]
305    /// );
306    /// # })
307    /// ```
308    pub async fn multi_get_pairs(
309        &self,
310        indices: Vec<usize>,
311    ) -> Result<Vec<(usize, Option<T>)>, ViewError> {
312        let values = self.multi_get(indices.clone()).await?;
313        Ok(indices.into_iter().zip(values).collect())
314    }
315
316    async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
317        let count = range.len();
318        let mut keys = Vec::with_capacity(count);
319        for index in range {
320            let key = self
321                .context
322                .base_key()
323                .derive_tag_key(KeyTag::Index as u8, &index)?;
324            keys.push(key);
325        }
326        let mut values = Vec::with_capacity(count);
327        for entry in self.context.store().read_multi_values(&keys).await? {
328            match entry {
329                None => {
330                    return Err(ViewError::MissingEntries("LogView".into()));
331                }
332                Some(value) => values.push(value),
333            }
334        }
335        Ok(values)
336    }
337
338    /// Reads the logged values in the given range (including staged ones).
339    /// ```rust
340    /// # tokio_test::block_on(async {
341    /// # use linera_views::context::MemoryContext;
342    /// # use linera_views::log_view::LogView;
343    /// # use linera_views::views::View;
344    /// # let context = MemoryContext::new_for_testing(());
345    /// let mut log = LogView::load(context).await.unwrap();
346    /// log.push(34);
347    /// log.push(42);
348    /// log.push(56);
349    /// assert_eq!(log.read(0..2).await.unwrap(), vec![34, 42]);
350    /// # })
351    /// ```
352    pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
353    where
354        R: RangeBounds<usize>,
355    {
356        let effective_stored_count = if self.delete_storage_first {
357            0
358        } else {
359            self.stored_count
360        };
361        let end = match range.end_bound() {
362            Bound::Included(end) => *end + 1,
363            Bound::Excluded(end) => *end,
364            Bound::Unbounded => self.count(),
365        }
366        .min(self.count());
367        let start = match range.start_bound() {
368            Bound::Included(start) => *start,
369            Bound::Excluded(start) => *start + 1,
370            Bound::Unbounded => 0,
371        };
372        if start >= end {
373            return Ok(Vec::new());
374        }
375        if start < effective_stored_count {
376            if end <= effective_stored_count {
377                self.read_context(start..end).await
378            } else {
379                let mut values = self.read_context(start..effective_stored_count).await?;
380                values.extend(
381                    self.new_values[0..(end - effective_stored_count)]
382                        .iter()
383                        .cloned(),
384                );
385                Ok(values)
386            }
387        } else {
388            Ok(
389                self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
390                    .to_vec(),
391            )
392        }
393    }
394}
395
396impl<C, T> HashableView for LogView<C, T>
397where
398    C: Context,
399    T: Send + Sync + Clone + Serialize + DeserializeOwned,
400{
401    type Hasher = sha3::Sha3_256;
402
403    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
404        self.hash().await
405    }
406
407    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
408        #[cfg(with_metrics)]
409        let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
410        let elements = self.read(..).await?;
411        let mut hasher = sha3::Sha3_256::default();
412        hasher.update_with_bcs_bytes(&elements)?;
413        Ok(hasher.finalize())
414    }
415}
416
417/// Type wrapping `LogView` while memoizing the hash.
418pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
419
420/// Wrapper around `LogView` to compute hashes based on the history of changes.
421pub type HistoricallyHashedLogView<C, T> = HistoricallyHashableView<C, LogView<C, T>>;
422
423#[cfg(not(web))]
424mod graphql {
425    use std::borrow::Cow;
426
427    use super::LogView;
428    use crate::{
429        context::Context,
430        graphql::{hash_name, mangle},
431    };
432
433    impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
434        fn type_name() -> Cow<'static, str> {
435            format!(
436                "LogView_{}_{:08x}",
437                mangle(T::type_name()),
438                hash_name::<T>()
439            )
440            .into()
441        }
442    }
443
444    #[async_graphql::Object(cache_control(no_cache), name_type)]
445    impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
446    where
447        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
448    {
449        #[graphql(derived(name = "count"))]
450        async fn count_(&self) -> Result<u32, async_graphql::Error> {
451            Ok(self.count() as u32)
452        }
453
454        async fn entries(
455            &self,
456            start: Option<usize>,
457            end: Option<usize>,
458        ) -> async_graphql::Result<Vec<T>> {
459            Ok(self
460                .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
461                .await?)
462        }
463    }
464}