linera_views/views/
historical_hash_wrapper.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    marker::PhantomData,
6    ops::{Deref, DerefMut},
7};
8
9#[cfg(with_metrics)]
10use linera_base::prometheus_util::MeasureLatency as _;
11
12use crate::{
13    batch::Batch,
14    common::from_bytes_option,
15    context::Context,
16    store::ReadableKeyValueStore as _,
17    views::{ClonableView, Hasher, HasherOutput, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
18};
19
20#[cfg(with_metrics)]
21mod metrics {
22    use std::sync::LazyLock;
23
24    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
25    use prometheus::HistogramVec;
26
27    /// The runtime of hash computation
28    pub static HISTORICALLY_HASHABLE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> =
29        LazyLock::new(|| {
30            register_histogram_vec(
31                "historically_hashable_view_hash_runtime",
32                "HistoricallyHashableView hash runtime",
33                &[],
34                exponential_bucket_latencies(5.0),
35            )
36        });
37}
38
39/// Wrapper to compute the hash of the view based on its history of modifications.
40#[derive(Debug)]
41pub struct HistoricallyHashableView<C, W> {
42    /// The hash in storage.
43    stored_hash: Option<HasherOutput>,
44    /// The inner view.
45    inner: W,
46    /// Memoized hash, if any.
47    hash: Option<HasherOutput>,
48    /// Track context type.
49    _phantom: PhantomData<C>,
50}
51
52/// Key tags to create the sub-keys of a `HistoricallyHashableView` on top of the base key.
53#[repr(u8)]
54enum KeyTag {
55    /// Prefix for the indices of the view.
56    Inner = MIN_VIEW_TAG,
57    /// Prefix for the hash.
58    Hash,
59}
60
61impl<C, W> HistoricallyHashableView<C, W> {
62    fn make_hash(
63        stored_hash: Option<HasherOutput>,
64        batch: &Batch,
65    ) -> Result<HasherOutput, ViewError> {
66        #[cfg(with_metrics)]
67        let _hash_latency = metrics::HISTORICALLY_HASHABLE_VIEW_HASH_RUNTIME.measure_latency();
68        let stored_hash = stored_hash.unwrap_or_default();
69        if batch.is_empty() {
70            return Ok(stored_hash);
71        }
72        let mut hasher = sha3::Sha3_256::default();
73        hasher.update_with_bytes(&stored_hash)?;
74        hasher.update_with_bcs_bytes(&batch)?;
75        Ok(hasher.finalize())
76    }
77}
78
79impl<C, W, C2> ReplaceContext<C2> for HistoricallyHashableView<C, W>
80where
81    W: View<Context = C> + ReplaceContext<C2>,
82    C: Context,
83    C2: Context,
84{
85    type Target = HistoricallyHashableView<C2, <W as ReplaceContext<C2>>::Target>;
86
87    async fn with_context(
88        &mut self,
89        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
90    ) -> Self::Target {
91        HistoricallyHashableView {
92            _phantom: PhantomData,
93            stored_hash: self.stored_hash,
94            hash: self.hash,
95            inner: self.inner.with_context(ctx).await,
96        }
97    }
98}
99
100impl<W> View for HistoricallyHashableView<W::Context, W>
101where
102    W: View,
103{
104    const NUM_INIT_KEYS: usize = 1 + W::NUM_INIT_KEYS;
105
106    type Context = W::Context;
107
108    fn context(&self) -> &Self::Context {
109        self.inner.context()
110    }
111
112    fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
113        let mut v = vec![context.base_key().base_tag(KeyTag::Hash as u8)];
114        let base_key = context.base_key().base_tag(KeyTag::Inner as u8);
115        let context = context.clone_with_base_key(base_key);
116        v.extend(W::pre_load(&context)?);
117        Ok(v)
118    }
119
120    fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
121        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
122        let base_key = context.base_key().base_tag(KeyTag::Inner as u8);
123        let context = context.clone_with_base_key(base_key);
124        let inner = W::post_load(
125            context,
126            values.get(1..).ok_or(ViewError::PostLoadValuesError)?,
127        )?;
128        Ok(Self {
129            _phantom: PhantomData,
130            stored_hash: hash,
131            hash,
132            inner,
133        })
134    }
135
136    async fn load(context: Self::Context) -> Result<Self, ViewError> {
137        let keys = Self::pre_load(&context)?;
138        let values = context.store().read_multi_values_bytes(keys).await?;
139        Self::post_load(context, &values)
140    }
141
142    fn rollback(&mut self) {
143        self.inner.rollback();
144        self.hash = self.stored_hash;
145    }
146
147    async fn has_pending_changes(&self) -> bool {
148        self.inner.has_pending_changes().await
149    }
150
151    fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
152        let mut inner_batch = Batch::new();
153        self.inner.flush(&mut inner_batch)?;
154        let hash = Self::make_hash(self.stored_hash, &inner_batch)?;
155        batch.operations.extend(inner_batch.operations);
156        if self.stored_hash != Some(hash) {
157            let mut key = self.inner.context().base_key().bytes.clone();
158            let tag = key.last_mut().unwrap();
159            *tag = KeyTag::Hash as u8;
160            batch.put_key_value(key, &hash)?;
161            self.stored_hash = Some(hash);
162        }
163        // Remember the hash.
164        self.hash = Some(hash);
165        // Never delete the stored hash, even if the inner view was cleared.
166        Ok(false)
167    }
168
169    fn clear(&mut self) {
170        self.inner.clear();
171        self.hash = None;
172    }
173}
174
175impl<W> ClonableView for HistoricallyHashableView<W::Context, W>
176where
177    W: ClonableView,
178{
179    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
180        Ok(HistoricallyHashableView {
181            _phantom: PhantomData,
182            stored_hash: self.stored_hash,
183            hash: self.hash,
184            inner: self.inner.clone_unchecked()?,
185        })
186    }
187}
188
189impl<W: ClonableView> HistoricallyHashableView<W::Context, W> {
190    /// Obtains a hash of the history of the changes in the view.
191    pub async fn historical_hash(&mut self) -> Result<HasherOutput, ViewError> {
192        if let Some(hash) = self.hash {
193            return Ok(hash);
194        }
195        let mut batch = Batch::new();
196        if self.inner.has_pending_changes().await {
197            let mut inner = self.inner.clone_unchecked()?;
198            inner.flush(&mut batch)?;
199        }
200        let hash = Self::make_hash(self.stored_hash, &batch)?;
201        // Remember the hash that we just computed.
202        self.hash = Some(hash);
203        Ok(hash)
204    }
205}
206
207impl<C, W> Deref for HistoricallyHashableView<C, W> {
208    type Target = W;
209
210    fn deref(&self) -> &W {
211        &self.inner
212    }
213}
214
215impl<C, W> DerefMut for HistoricallyHashableView<C, W> {
216    fn deref_mut(&mut self) -> &mut W {
217        // Clear the memoized hash.
218        self.hash = None;
219        &mut self.inner
220    }
221}
222
223#[cfg(with_graphql)]
224mod graphql {
225    use std::borrow::Cow;
226
227    use super::HistoricallyHashableView;
228    use crate::context::Context;
229
230    impl<C, W> async_graphql::OutputType for HistoricallyHashableView<C, W>
231    where
232        C: Context,
233        W: async_graphql::OutputType + Send + Sync,
234    {
235        fn type_name() -> Cow<'static, str> {
236            W::type_name()
237        }
238
239        fn qualified_type_name() -> String {
240            W::qualified_type_name()
241        }
242
243        fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
244            W::create_type_info(registry)
245        }
246
247        async fn resolve(
248            &self,
249            ctx: &async_graphql::ContextSelectionSet<'_>,
250            field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
251        ) -> async_graphql::ServerResult<async_graphql::Value> {
252            self.inner.resolve(ctx, field).await
253        }
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use crate::{
261        context::MemoryContext, register_view::RegisterView, store::WritableKeyValueStore as _,
262    };
263
264    #[tokio::test]
265    async fn test_historically_hashable_view_initial_state() -> Result<(), ViewError> {
266        let context = MemoryContext::new_for_testing(());
267        let mut view =
268            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
269
270        // Initially should have no pending changes
271        assert!(!view.has_pending_changes().await);
272
273        // Initial hash should be the hash of an empty batch with default stored_hash
274        let hash = view.historical_hash().await?;
275        assert_eq!(hash, HasherOutput::default());
276
277        Ok(())
278    }
279
280    #[tokio::test]
281    async fn test_historically_hashable_view_hash_changes_with_modifications(
282    ) -> Result<(), ViewError> {
283        let context = MemoryContext::new_for_testing(());
284        let mut view =
285            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
286
287        // Get initial hash
288        let hash0 = view.historical_hash().await?;
289
290        // Set a value
291        view.set(42);
292        assert!(view.has_pending_changes().await);
293
294        // Hash should change after modification
295        let hash1 = view.historical_hash().await?;
296
297        // Calling `historical_hash` doesn't flush changes.
298        assert!(view.has_pending_changes().await);
299        assert_ne!(hash0, hash1);
300
301        // Flush and verify hash is stored
302        let mut batch = Batch::new();
303        view.flush(&mut batch)?;
304        context.store().write_batch(batch).await?;
305        assert!(!view.has_pending_changes().await);
306        assert_eq!(hash1, view.historical_hash().await?);
307
308        // Make another modification
309        view.set(84);
310        let hash2 = view.historical_hash().await?;
311        assert_ne!(hash1, hash2);
312
313        Ok(())
314    }
315
316    #[tokio::test]
317    async fn test_historically_hashable_view_reloaded() -> Result<(), ViewError> {
318        let context = MemoryContext::new_for_testing(());
319        let mut view =
320            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
321
322        // Set initial value and flush
323        view.set(42);
324        let mut batch = Batch::new();
325        view.flush(&mut batch)?;
326        context.store().write_batch(batch).await?;
327
328        let hash_after_flush = view.historical_hash().await?;
329
330        // Reload the view
331        let mut view2 =
332            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
333
334        // Hash should be the same (loaded from storage)
335        let hash_reloaded = view2.historical_hash().await?;
336        assert_eq!(hash_after_flush, hash_reloaded);
337
338        Ok(())
339    }
340
341    #[tokio::test]
342    async fn test_historically_hashable_view_rollback() -> Result<(), ViewError> {
343        let context = MemoryContext::new_for_testing(());
344        let mut view =
345            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
346
347        // Set and persist a value
348        view.set(42);
349        let mut batch = Batch::new();
350        view.flush(&mut batch)?;
351        context.store().write_batch(batch).await?;
352
353        let hash_before = view.historical_hash().await?;
354        assert!(!view.has_pending_changes().await);
355
356        // Make a modification
357        view.set(84);
358        assert!(view.has_pending_changes().await);
359        let hash_modified = view.historical_hash().await?;
360        assert_ne!(hash_before, hash_modified);
361
362        // Rollback
363        view.rollback();
364        assert!(!view.has_pending_changes().await);
365
366        // Hash should return to previous value
367        let hash_after_rollback = view.historical_hash().await?;
368        assert_eq!(hash_before, hash_after_rollback);
369
370        Ok(())
371    }
372
373    #[tokio::test]
374    async fn test_historically_hashable_view_clear() -> Result<(), ViewError> {
375        let context = MemoryContext::new_for_testing(());
376        let mut view =
377            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
378
379        // Set and persist a value
380        view.set(42);
381        let mut batch = Batch::new();
382        view.flush(&mut batch)?;
383        context.store().write_batch(batch).await?;
384
385        assert_ne!(view.historical_hash().await?, HasherOutput::default());
386
387        // Clear the view
388        view.clear();
389        assert!(view.has_pending_changes().await);
390
391        // Flush the clear operation
392        let mut batch = Batch::new();
393        let delete_view = view.flush(&mut batch)?;
394        assert!(!delete_view);
395        context.store().write_batch(batch).await?;
396
397        // Verify the view is not reset to default
398        assert_ne!(view.historical_hash().await?, HasherOutput::default());
399
400        Ok(())
401    }
402
403    #[tokio::test]
404    async fn test_historically_hashable_view_clone_unchecked() -> Result<(), ViewError> {
405        let context = MemoryContext::new_for_testing(());
406        let mut view =
407            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
408
409        // Set a value
410        view.set(42);
411        let mut batch = Batch::new();
412        view.flush(&mut batch)?;
413        context.store().write_batch(batch).await?;
414
415        let original_hash = view.historical_hash().await?;
416
417        // Clone the view
418        let mut cloned_view = view.clone_unchecked()?;
419
420        // Verify the clone has the same hash initially
421        let cloned_hash = cloned_view.historical_hash().await?;
422        assert_eq!(original_hash, cloned_hash);
423
424        // Modify the clone
425        cloned_view.set(84);
426        let cloned_hash_after = cloned_view.historical_hash().await?;
427        assert_ne!(original_hash, cloned_hash_after);
428
429        // Original should be unchanged
430        let original_hash_after = view.historical_hash().await?;
431        assert_eq!(original_hash, original_hash_after);
432
433        Ok(())
434    }
435
436    #[tokio::test]
437    async fn test_historically_hashable_view_flush_updates_stored_hash() -> Result<(), ViewError> {
438        let context = MemoryContext::new_for_testing(());
439        let mut view =
440            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
441
442        // Initial state - no stored hash
443        assert!(!view.has_pending_changes().await);
444
445        // Set a value
446        view.set(42);
447        assert!(view.has_pending_changes().await);
448
449        let hash_before_flush = view.historical_hash().await?;
450
451        // Flush - this should update stored_hash
452        let mut batch = Batch::new();
453        let delete_view = view.flush(&mut batch)?;
454        assert!(!delete_view);
455        context.store().write_batch(batch).await?;
456
457        assert!(!view.has_pending_changes().await);
458
459        // Make another change
460        view.set(84);
461        let hash_after_second_change = view.historical_hash().await?;
462
463        // The new hash should be based on the previous stored hash
464        assert_ne!(hash_before_flush, hash_after_second_change);
465
466        Ok(())
467    }
468
469    #[tokio::test]
470    async fn test_historically_hashable_view_deref() -> Result<(), ViewError> {
471        let context = MemoryContext::new_for_testing(());
472        let mut view =
473            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
474
475        // Test Deref - we can access inner view methods directly
476        view.set(42);
477        assert_eq!(*view.get(), 42);
478
479        // Test DerefMut
480        view.set(84);
481        assert_eq!(*view.get(), 84);
482
483        Ok(())
484    }
485
486    #[tokio::test]
487    async fn test_historically_hashable_view_sequential_modifications() -> Result<(), ViewError> {
488        async fn get_hash(values: &[u32]) -> Result<HasherOutput, ViewError> {
489            let context = MemoryContext::new_for_testing(());
490            let mut view =
491                HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
492
493            let mut previous_hash = view.historical_hash().await?;
494            for &value in values {
495                view.set(value);
496                if value % 2 == 0 {
497                    // Immediately save after odd values.
498                    let mut batch = Batch::new();
499                    view.flush(&mut batch)?;
500                    context.store().write_batch(batch).await?;
501                }
502                let current_hash = view.historical_hash().await?;
503                assert_ne!(previous_hash, current_hash);
504                previous_hash = current_hash;
505            }
506            Ok(previous_hash)
507        }
508
509        let h1 = get_hash(&[10, 20, 30, 40, 50]).await?;
510        let h2 = get_hash(&[20, 30, 40, 50]).await?;
511        let h3 = get_hash(&[20, 21, 30, 40, 50]).await?;
512        assert_ne!(h1, h2);
513        assert_eq!(h2, h3);
514        Ok(())
515    }
516
517    #[tokio::test]
518    async fn test_historically_hashable_view_flush_with_no_hash_change() -> Result<(), ViewError> {
519        let context = MemoryContext::new_for_testing(());
520        let mut view =
521            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
522
523        // Set and flush a value
524        view.set(42);
525        let mut batch = Batch::new();
526        view.flush(&mut batch)?;
527        context.store().write_batch(batch).await?;
528
529        let hash_before = view.historical_hash().await?;
530
531        // Flush again without changes - no new hash should be stored
532        let mut batch = Batch::new();
533        view.flush(&mut batch)?;
534        assert!(batch.is_empty());
535        context.store().write_batch(batch).await?;
536
537        let hash_after = view.historical_hash().await?;
538        assert_eq!(hash_before, hash_after);
539
540        Ok(())
541    }
542}