linera_views/views/
log_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::ops::{Bound, Range, RangeBounds};
5
6#[cfg(with_metrics)]
7use linera_base::prometheus_util::MeasureLatency as _;
8use serde::{de::DeserializeOwned, Serialize};
9
10use crate::{
11    batch::Batch,
12    common::{from_bytes_option_or_default, HasherOutput},
13    context::Context,
14    hashable_wrapper::WrappedHashableContainerView,
15    store::ReadableKeyValueStore as _,
16    views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
17};
18
19#[cfg(with_metrics)]
20mod metrics {
21    use std::sync::LazyLock;
22
23    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
24    use prometheus::HistogramVec;
25
26    /// The runtime of hash computation
27    pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
28        register_histogram_vec(
29            "log_view_hash_runtime",
30            "LogView hash runtime",
31            &[],
32            exponential_bucket_latencies(5.0),
33        )
34    });
35}
36
37/// Key tags to create the sub-keys of a `LogView` on top of the base key.
38#[repr(u8)]
39enum KeyTag {
40    /// Prefix for the storing of the variable `stored_count`.
41    Count = MIN_VIEW_TAG,
42    /// Prefix for the indices of the log.
43    Index,
44}
45
46/// A view that supports logging values of type `T`.
47#[derive(Debug)]
48pub struct LogView<C, T> {
49    context: C,
50    delete_storage_first: bool,
51    stored_count: usize,
52    new_values: Vec<T>,
53}
54
55impl<C, T> View for LogView<C, T>
56where
57    C: Context,
58    T: Send + Sync + Serialize,
59{
60    const NUM_INIT_KEYS: usize = 1;
61
62    type Context = C;
63
64    fn context(&self) -> &C {
65        &self.context
66    }
67
68    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
69        Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
70    }
71
72    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
73        let stored_count =
74            from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
75        Ok(Self {
76            context,
77            delete_storage_first: false,
78            stored_count,
79            new_values: Vec::new(),
80        })
81    }
82
83    async fn load(context: C) -> Result<Self, ViewError> {
84        let keys = Self::pre_load(&context)?;
85        let values = context.store().read_multi_values_bytes(keys).await?;
86        Self::post_load(context, &values)
87    }
88
89    fn rollback(&mut self) {
90        self.delete_storage_first = false;
91        self.new_values.clear();
92    }
93
94    async fn has_pending_changes(&self) -> bool {
95        if self.delete_storage_first {
96            return true;
97        }
98        !self.new_values.is_empty()
99    }
100
101    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
102        let mut delete_view = false;
103        if self.delete_storage_first {
104            batch.delete_key_prefix(self.context.base_key().bytes.clone());
105            self.stored_count = 0;
106            delete_view = true;
107        }
108        if !self.new_values.is_empty() {
109            delete_view = false;
110            for value in &self.new_values {
111                let key = self
112                    .context
113                    .base_key()
114                    .derive_tag_key(KeyTag::Index as u8, &self.stored_count)?;
115                batch.put_key_value(key, value)?;
116                self.stored_count += 1;
117            }
118            let key = self.context.base_key().base_tag(KeyTag::Count as u8);
119            batch.put_key_value(key, &self.stored_count)?;
120            self.new_values.clear();
121        }
122        self.delete_storage_first = false;
123        Ok(delete_view)
124    }
125
126    fn clear(&mut self) {
127        self.delete_storage_first = true;
128        self.new_values.clear();
129    }
130}
131
132impl<C, T> ClonableView for LogView<C, T>
133where
134    C: Context,
135    T: Clone + Send + Sync + Serialize,
136{
137    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
138        Ok(LogView {
139            context: self.context.clone(),
140            delete_storage_first: self.delete_storage_first,
141            stored_count: self.stored_count,
142            new_values: self.new_values.clone(),
143        })
144    }
145}
146
147impl<C, T> LogView<C, T>
148where
149    C: Context,
150{
151    /// Pushes a value to the end of the log.
152    /// ```rust
153    /// # tokio_test::block_on(async {
154    /// # use linera_views::context::MemoryContext;
155    /// # use linera_views::log_view::LogView;
156    /// # use linera_views::views::View;
157    /// # let context = MemoryContext::new_for_testing(());
158    /// let mut log = LogView::load(context).await.unwrap();
159    /// log.push(34);
160    /// # })
161    /// ```
162    pub fn push(&mut self, value: T) {
163        self.new_values.push(value);
164    }
165
166    /// Reads the size of the log.
167    /// ```rust
168    /// # tokio_test::block_on(async {
169    /// # use linera_views::context::MemoryContext;
170    /// # use linera_views::log_view::LogView;
171    /// # use linera_views::views::View;
172    /// # let context = MemoryContext::new_for_testing(());
173    /// let mut log = LogView::load(context).await.unwrap();
174    /// log.push(34);
175    /// log.push(42);
176    /// assert_eq!(log.count(), 2);
177    /// # })
178    /// ```
179    pub fn count(&self) -> usize {
180        if self.delete_storage_first {
181            self.new_values.len()
182        } else {
183            self.stored_count + self.new_values.len()
184        }
185    }
186
187    /// Obtains the extra data.
188    pub fn extra(&self) -> &C::Extra {
189        self.context.extra()
190    }
191}
192
193impl<C, T> LogView<C, T>
194where
195    C: Context,
196    T: Clone + DeserializeOwned + Serialize + Send + Sync,
197{
198    /// Reads the logged value with the given index (including staged ones).
199    /// ```rust
200    /// # tokio_test::block_on(async {
201    /// # use linera_views::context::MemoryContext;
202    /// # use linera_views::log_view::LogView;
203    /// # use linera_views::views::View;
204    /// # let context = MemoryContext::new_for_testing(());
205    /// let mut log = LogView::load(context).await.unwrap();
206    /// log.push(34);
207    /// assert_eq!(log.get(0).await.unwrap(), Some(34));
208    /// # })
209    /// ```
210    pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
211        let value = if self.delete_storage_first {
212            self.new_values.get(index).cloned()
213        } else if index < self.stored_count {
214            let key = self
215                .context
216                .base_key()
217                .derive_tag_key(KeyTag::Index as u8, &index)?;
218            self.context.store().read_value(&key).await?
219        } else {
220            self.new_values.get(index - self.stored_count).cloned()
221        };
222        Ok(value)
223    }
224
225    /// Reads several logged keys (including staged ones)
226    /// ```rust
227    /// # tokio_test::block_on(async {
228    /// # use linera_views::context::MemoryContext;
229    /// # use linera_views::log_view::LogView;
230    /// # use linera_views::views::View;
231    /// # let context = MemoryContext::new_for_testing(());
232    /// let mut log = LogView::load(context).await.unwrap();
233    /// log.push(34);
234    /// log.push(42);
235    /// assert_eq!(
236    ///     log.multi_get(vec![0, 1]).await.unwrap(),
237    ///     vec![Some(34), Some(42)]
238    /// );
239    /// # })
240    /// ```
241    pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
242        let mut result = Vec::new();
243        if self.delete_storage_first {
244            for index in indices {
245                result.push(self.new_values.get(index).cloned());
246            }
247        } else {
248            let mut keys = Vec::new();
249            let mut positions = Vec::new();
250            for (pos, index) in indices.into_iter().enumerate() {
251                if index < self.stored_count {
252                    let key = self
253                        .context
254                        .base_key()
255                        .derive_tag_key(KeyTag::Index as u8, &index)?;
256                    keys.push(key);
257                    positions.push(pos);
258                    result.push(None);
259                } else {
260                    result.push(self.new_values.get(index - self.stored_count).cloned());
261                }
262            }
263            let values = self.context.store().read_multi_values(keys).await?;
264            for (pos, value) in positions.into_iter().zip(values) {
265                *result.get_mut(pos).unwrap() = value;
266            }
267        }
268        Ok(result)
269    }
270
271    async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
272        let count = range.len();
273        let mut keys = Vec::with_capacity(count);
274        for index in range {
275            let key = self
276                .context
277                .base_key()
278                .derive_tag_key(KeyTag::Index as u8, &index)?;
279            keys.push(key);
280        }
281        let mut values = Vec::with_capacity(count);
282        for entry in self.context.store().read_multi_values(keys).await? {
283            match entry {
284                None => {
285                    return Err(ViewError::MissingEntries);
286                }
287                Some(value) => values.push(value),
288            }
289        }
290        Ok(values)
291    }
292
293    /// Reads the logged values in the given range (including staged ones).
294    /// ```rust
295    /// # tokio_test::block_on(async {
296    /// # use linera_views::context::MemoryContext;
297    /// # use linera_views::log_view::LogView;
298    /// # use linera_views::views::View;
299    /// # let context = MemoryContext::new_for_testing(());
300    /// let mut log = LogView::load(context).await.unwrap();
301    /// log.push(34);
302    /// log.push(42);
303    /// log.push(56);
304    /// assert_eq!(log.read(0..2).await.unwrap(), vec![34, 42]);
305    /// # })
306    /// ```
307    pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
308    where
309        R: RangeBounds<usize>,
310    {
311        let effective_stored_count = if self.delete_storage_first {
312            0
313        } else {
314            self.stored_count
315        };
316        let end = match range.end_bound() {
317            Bound::Included(end) => *end + 1,
318            Bound::Excluded(end) => *end,
319            Bound::Unbounded => self.count(),
320        }
321        .min(self.count());
322        let start = match range.start_bound() {
323            Bound::Included(start) => *start,
324            Bound::Excluded(start) => *start + 1,
325            Bound::Unbounded => 0,
326        };
327        if start >= end {
328            return Ok(Vec::new());
329        }
330        if start < effective_stored_count {
331            if end <= effective_stored_count {
332                self.read_context(start..end).await
333            } else {
334                let mut values = self.read_context(start..effective_stored_count).await?;
335                values.extend(
336                    self.new_values[0..(end - effective_stored_count)]
337                        .iter()
338                        .cloned(),
339                );
340                Ok(values)
341            }
342        } else {
343            Ok(
344                self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
345                    .to_vec(),
346            )
347        }
348    }
349}
350
351impl<C, T> HashableView for LogView<C, T>
352where
353    C: Context,
354    T: Send + Sync + Clone + Serialize + DeserializeOwned,
355{
356    type Hasher = sha3::Sha3_256;
357
358    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
359        self.hash().await
360    }
361
362    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
363        #[cfg(with_metrics)]
364        let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
365        let elements = self.read(..).await?;
366        let mut hasher = sha3::Sha3_256::default();
367        hasher.update_with_bcs_bytes(&elements)?;
368        Ok(hasher.finalize())
369    }
370}
371
372/// Type wrapping `LogView` while memoizing the hash.
373pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
374
375#[cfg(not(web))]
376mod graphql {
377    use std::borrow::Cow;
378
379    use super::LogView;
380    use crate::{
381        context::Context,
382        graphql::{hash_name, mangle},
383    };
384
385    impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
386        fn type_name() -> Cow<'static, str> {
387            format!(
388                "LogView_{}_{:08x}",
389                mangle(T::type_name()),
390                hash_name::<T>()
391            )
392            .into()
393        }
394    }
395
396    #[async_graphql::Object(cache_control(no_cache), name_type)]
397    impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
398    where
399        T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
400    {
401        async fn entries(
402            &self,
403            start: Option<usize>,
404            end: Option<usize>,
405        ) -> async_graphql::Result<Vec<T>> {
406            Ok(self
407                .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
408                .await?)
409        }
410    }
411}