linera_views/views/
historical_hash_wrapper.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    marker::PhantomData,
6    ops::{Deref, DerefMut},
7    sync::Mutex,
8};
9
10use allocative::Allocative;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::visit_allocative_simple;
14
15use crate::{
16    batch::Batch,
17    common::from_bytes_option,
18    context::Context,
19    store::ReadableKeyValueStore as _,
20    views::{ClonableView, Hasher, HasherOutput, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
21};
22
23#[cfg(with_metrics)]
24mod metrics {
25    use std::sync::LazyLock;
26
27    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
28    use prometheus::HistogramVec;
29
30    /// The runtime of hash computation
31    pub static HISTORICALLY_HASHABLE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> =
32        LazyLock::new(|| {
33            register_histogram_vec(
34                "historically_hashable_view_hash_runtime",
35                "HistoricallyHashableView hash runtime",
36                &[],
37                exponential_bucket_latencies(5.0),
38            )
39        });
40}
41
42/// Wrapper to compute the hash of the view based on its history of modifications.
43#[derive(Debug, Allocative)]
44#[allocative(bound = "C, W: Allocative")]
45pub struct HistoricallyHashableView<C, W> {
46    /// The hash in storage.
47    #[allocative(visit = visit_allocative_simple)]
48    stored_hash: Option<HasherOutput>,
49    /// The inner view.
50    inner: W,
51    /// Memoized hash, if any.
52    #[allocative(visit = visit_allocative_simple)]
53    hash: Mutex<Option<HasherOutput>>,
54    /// Track context type.
55    #[allocative(skip)]
56    _phantom: PhantomData<C>,
57}
58
59/// Key tags to create the sub-keys of a `HistoricallyHashableView` on top of the base key.
60#[repr(u8)]
61enum KeyTag {
62    /// Prefix for the indices of the view.
63    Inner = MIN_VIEW_TAG,
64    /// Prefix for the hash.
65    Hash,
66}
67
68impl<C, W> HistoricallyHashableView<C, W> {
69    fn make_hash(
70        stored_hash: Option<HasherOutput>,
71        batch: &Batch,
72    ) -> Result<HasherOutput, ViewError> {
73        #[cfg(with_metrics)]
74        let _hash_latency = metrics::HISTORICALLY_HASHABLE_VIEW_HASH_RUNTIME.measure_latency();
75        let stored_hash = stored_hash.unwrap_or_default();
76        if batch.is_empty() {
77            return Ok(stored_hash);
78        }
79        let mut hasher = sha3::Sha3_256::default();
80        hasher.update_with_bytes(&stored_hash)?;
81        hasher.update_with_bcs_bytes(&batch)?;
82        Ok(hasher.finalize())
83    }
84}
85
86impl<C, W, C2> ReplaceContext<C2> for HistoricallyHashableView<C, W>
87where
88    W: View<Context = C> + ReplaceContext<C2>,
89    C: Context,
90    C2: Context,
91{
92    type Target = HistoricallyHashableView<C2, <W as ReplaceContext<C2>>::Target>;
93
94    async fn with_context(
95        &mut self,
96        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
97    ) -> Self::Target {
98        HistoricallyHashableView {
99            _phantom: PhantomData,
100            stored_hash: self.stored_hash,
101            hash: Mutex::new(*self.hash.get_mut().unwrap()),
102            inner: self.inner.with_context(ctx).await,
103        }
104    }
105}
106
107impl<W> View for HistoricallyHashableView<W::Context, W>
108where
109    W: View,
110{
111    const NUM_INIT_KEYS: usize = 1 + W::NUM_INIT_KEYS;
112
113    type Context = W::Context;
114
115    fn context(&self) -> Self::Context {
116        // The inner context has our base key plus the KeyTag::Inner byte
117        self.inner.context().clone_with_trimmed_key(1)
118    }
119
120    fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
121        let mut v = vec![context.base_key().base_tag(KeyTag::Hash as u8)];
122        let base_key = context.base_key().base_tag(KeyTag::Inner as u8);
123        let context = context.clone_with_base_key(base_key);
124        v.extend(W::pre_load(&context)?);
125        Ok(v)
126    }
127
128    fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
129        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
130        let base_key = context.base_key().base_tag(KeyTag::Inner as u8);
131        let context = context.clone_with_base_key(base_key);
132        let inner = W::post_load(
133            context,
134            values.get(1..).ok_or(ViewError::PostLoadValuesError)?,
135        )?;
136        Ok(Self {
137            _phantom: PhantomData,
138            stored_hash: hash,
139            hash: Mutex::new(hash),
140            inner,
141        })
142    }
143
144    async fn load(context: Self::Context) -> Result<Self, ViewError> {
145        let keys = Self::pre_load(&context)?;
146        let values = context.store().read_multi_values_bytes(&keys).await?;
147        Self::post_load(context, &values)
148    }
149
150    fn rollback(&mut self) {
151        self.inner.rollback();
152        *self.hash.get_mut().unwrap() = self.stored_hash;
153    }
154
155    async fn has_pending_changes(&self) -> bool {
156        self.inner.has_pending_changes().await
157    }
158
159    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
160        let mut inner_batch = Batch::new();
161        self.inner.pre_save(&mut inner_batch)?;
162        let new_hash = {
163            let mut maybe_hash = self.hash.lock().unwrap();
164            match maybe_hash.as_mut() {
165                Some(hash) => *hash,
166                None => {
167                    let hash = Self::make_hash(self.stored_hash, &inner_batch)?;
168                    *maybe_hash = Some(hash);
169                    hash
170                }
171            }
172        };
173        batch.operations.extend(inner_batch.operations);
174
175        if self.stored_hash != Some(new_hash) {
176            let mut key = self.inner.context().base_key().bytes.clone();
177            let tag = key.last_mut().unwrap();
178            *tag = KeyTag::Hash as u8;
179            batch.put_key_value(key, &new_hash)?;
180        }
181        // Never delete the stored hash, even if the inner view was cleared.
182        Ok(false)
183    }
184
185    fn post_save(&mut self) {
186        let new_hash = self
187            .hash
188            .get_mut()
189            .unwrap()
190            .expect("hash should be computed in pre_save");
191        self.stored_hash = Some(new_hash);
192        self.inner.post_save();
193    }
194
195    fn clear(&mut self) {
196        self.inner.clear();
197        *self.hash.get_mut().unwrap() = None;
198    }
199}
200
201impl<W> ClonableView for HistoricallyHashableView<W::Context, W>
202where
203    W: ClonableView,
204{
205    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
206        Ok(HistoricallyHashableView {
207            _phantom: PhantomData,
208            stored_hash: self.stored_hash,
209            hash: Mutex::new(*self.hash.get_mut().unwrap()),
210            inner: self.inner.clone_unchecked()?,
211        })
212    }
213}
214
215impl<W: View> HistoricallyHashableView<W::Context, W> {
216    /// Obtains a hash of the history of the changes in the view.
217    pub async fn historical_hash(&mut self) -> Result<HasherOutput, ViewError> {
218        if let Some(hash) = self.hash.get_mut().unwrap() {
219            return Ok(*hash);
220        }
221        let mut batch = Batch::new();
222        self.inner.pre_save(&mut batch)?;
223        let hash = Self::make_hash(self.stored_hash, &batch)?;
224        // Remember the hash that we just computed.
225        *self.hash.get_mut().unwrap() = Some(hash);
226        Ok(hash)
227    }
228}
229
230impl<C, W> Deref for HistoricallyHashableView<C, W> {
231    type Target = W;
232
233    fn deref(&self) -> &W {
234        &self.inner
235    }
236}
237
238impl<C, W> DerefMut for HistoricallyHashableView<C, W> {
239    fn deref_mut(&mut self) -> &mut W {
240        // Clear the memoized hash.
241        *self.hash.get_mut().unwrap() = None;
242        &mut self.inner
243    }
244}
245
246#[cfg(with_graphql)]
247mod graphql {
248    use std::borrow::Cow;
249
250    use super::HistoricallyHashableView;
251    use crate::context::Context;
252
253    impl<C, W> async_graphql::OutputType for HistoricallyHashableView<C, W>
254    where
255        C: Context,
256        W: async_graphql::OutputType + Send + Sync,
257    {
258        fn type_name() -> Cow<'static, str> {
259            W::type_name()
260        }
261
262        fn qualified_type_name() -> String {
263            W::qualified_type_name()
264        }
265
266        fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
267            W::create_type_info(registry)
268        }
269
270        async fn resolve(
271            &self,
272            ctx: &async_graphql::ContextSelectionSet<'_>,
273            field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
274        ) -> async_graphql::ServerResult<async_graphql::Value> {
275            self.inner.resolve(ctx, field).await
276        }
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::{
284        context::MemoryContext, register_view::RegisterView, store::WritableKeyValueStore as _,
285    };
286
287    #[tokio::test]
288    async fn test_historically_hashable_view_initial_state() -> Result<(), ViewError> {
289        let context = MemoryContext::new_for_testing(());
290        let mut view =
291            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
292
293        // Initially should have no pending changes
294        assert!(!view.has_pending_changes().await);
295
296        // Initial hash should be the hash of an empty batch with default stored_hash
297        let hash = view.historical_hash().await?;
298        assert_eq!(hash, HasherOutput::default());
299
300        Ok(())
301    }
302
303    #[tokio::test]
304    async fn test_historically_hashable_view_hash_changes_with_modifications(
305    ) -> Result<(), ViewError> {
306        let context = MemoryContext::new_for_testing(());
307        let mut view =
308            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
309
310        // Get initial hash
311        let hash0 = view.historical_hash().await?;
312
313        // Set a value
314        view.set(42);
315        assert!(view.has_pending_changes().await);
316
317        // Hash should change after modification
318        let hash1 = view.historical_hash().await?;
319
320        // Calling `historical_hash` doesn't flush changes.
321        assert!(view.has_pending_changes().await);
322        assert_ne!(hash0, hash1);
323
324        // Flush and verify hash is stored
325        let mut batch = Batch::new();
326        view.pre_save(&mut batch)?;
327        context.store().write_batch(batch).await?;
328        view.post_save();
329        assert!(!view.has_pending_changes().await);
330        assert_eq!(hash1, view.historical_hash().await?);
331
332        // Make another modification
333        view.set(84);
334        let hash2 = view.historical_hash().await?;
335        assert_ne!(hash1, hash2);
336
337        Ok(())
338    }
339
340    #[tokio::test]
341    async fn test_historically_hashable_view_reloaded() -> Result<(), ViewError> {
342        let context = MemoryContext::new_for_testing(());
343        let mut view =
344            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
345
346        // Set initial value and flush
347        view.set(42);
348        let mut batch = Batch::new();
349        view.pre_save(&mut batch)?;
350        context.store().write_batch(batch).await?;
351        view.post_save();
352
353        let hash_after_flush = view.historical_hash().await?;
354
355        // Reload the view
356        let mut view2 =
357            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
358
359        // Hash should be the same (loaded from storage)
360        let hash_reloaded = view2.historical_hash().await?;
361        assert_eq!(hash_after_flush, hash_reloaded);
362
363        Ok(())
364    }
365
366    #[tokio::test]
367    async fn test_historically_hashable_view_rollback() -> Result<(), ViewError> {
368        let context = MemoryContext::new_for_testing(());
369        let mut view =
370            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
371
372        // Set and persist a value
373        view.set(42);
374        let mut batch = Batch::new();
375        view.pre_save(&mut batch)?;
376        context.store().write_batch(batch).await?;
377        view.post_save();
378
379        let hash_before = view.historical_hash().await?;
380        assert!(!view.has_pending_changes().await);
381
382        // Make a modification
383        view.set(84);
384        assert!(view.has_pending_changes().await);
385        let hash_modified = view.historical_hash().await?;
386        assert_ne!(hash_before, hash_modified);
387
388        // Rollback
389        view.rollback();
390        assert!(!view.has_pending_changes().await);
391
392        // Hash should return to previous value
393        let hash_after_rollback = view.historical_hash().await?;
394        assert_eq!(hash_before, hash_after_rollback);
395
396        Ok(())
397    }
398
399    #[tokio::test]
400    async fn test_historically_hashable_view_clear() -> Result<(), ViewError> {
401        let context = MemoryContext::new_for_testing(());
402        let mut view =
403            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
404
405        // Set and persist a value
406        view.set(42);
407        let mut batch = Batch::new();
408        view.pre_save(&mut batch)?;
409        context.store().write_batch(batch).await?;
410        view.post_save();
411
412        assert_ne!(view.historical_hash().await?, HasherOutput::default());
413
414        // Clear the view
415        view.clear();
416        assert!(view.has_pending_changes().await);
417
418        // Flush the clear operation
419        let mut batch = Batch::new();
420        let delete_view = view.pre_save(&mut batch)?;
421        assert!(!delete_view);
422        context.store().write_batch(batch).await?;
423        view.post_save();
424
425        // Verify the view is not reset to default
426        assert_ne!(view.historical_hash().await?, HasherOutput::default());
427
428        Ok(())
429    }
430
431    #[tokio::test]
432    async fn test_historically_hashable_view_clone_unchecked() -> Result<(), ViewError> {
433        let context = MemoryContext::new_for_testing(());
434        let mut view =
435            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
436
437        // Set a value
438        view.set(42);
439        let mut batch = Batch::new();
440        view.pre_save(&mut batch)?;
441        context.store().write_batch(batch).await?;
442        view.post_save();
443
444        let original_hash = view.historical_hash().await?;
445
446        // Clone the view
447        let mut cloned_view = view.clone_unchecked()?;
448
449        // Verify the clone has the same hash initially
450        let cloned_hash = cloned_view.historical_hash().await?;
451        assert_eq!(original_hash, cloned_hash);
452
453        // Modify the clone
454        cloned_view.set(84);
455        let cloned_hash_after = cloned_view.historical_hash().await?;
456        assert_ne!(original_hash, cloned_hash_after);
457
458        // Original should be unchanged
459        let original_hash_after = view.historical_hash().await?;
460        assert_eq!(original_hash, original_hash_after);
461
462        Ok(())
463    }
464
465    #[tokio::test]
466    async fn test_historically_hashable_view_flush_updates_stored_hash() -> Result<(), ViewError> {
467        let context = MemoryContext::new_for_testing(());
468        let mut view =
469            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
470
471        // Initial state - no stored hash
472        assert!(!view.has_pending_changes().await);
473
474        // Set a value
475        view.set(42);
476        assert!(view.has_pending_changes().await);
477
478        let hash_before_flush = view.historical_hash().await?;
479
480        // Flush - this should update stored_hash
481        let mut batch = Batch::new();
482        let delete_view = view.pre_save(&mut batch)?;
483        assert!(!delete_view);
484        context.store().write_batch(batch).await?;
485        view.post_save();
486
487        assert!(!view.has_pending_changes().await);
488
489        // Make another change
490        view.set(84);
491        let hash_after_second_change = view.historical_hash().await?;
492
493        // The new hash should be based on the previous stored hash
494        assert_ne!(hash_before_flush, hash_after_second_change);
495
496        Ok(())
497    }
498
499    #[tokio::test]
500    async fn test_historically_hashable_view_deref() -> Result<(), ViewError> {
501        let context = MemoryContext::new_for_testing(());
502        let mut view =
503            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
504
505        // Test Deref - we can access inner view methods directly
506        view.set(42);
507        assert_eq!(*view.get(), 42);
508
509        // Test DerefMut
510        view.set(84);
511        assert_eq!(*view.get(), 84);
512
513        Ok(())
514    }
515
516    #[tokio::test]
517    async fn test_historically_hashable_view_sequential_modifications() -> Result<(), ViewError> {
518        async fn get_hash(values: &[u32]) -> Result<HasherOutput, ViewError> {
519            let context = MemoryContext::new_for_testing(());
520            let mut view =
521                HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
522
523            let mut previous_hash = view.historical_hash().await?;
524            for &value in values {
525                view.set(value);
526                if value % 2 == 0 {
527                    // Immediately save after odd values.
528                    let mut batch = Batch::new();
529                    view.pre_save(&mut batch)?;
530                    context.store().write_batch(batch).await?;
531                    view.post_save();
532                }
533                let current_hash = view.historical_hash().await?;
534                assert_ne!(previous_hash, current_hash);
535                previous_hash = current_hash;
536            }
537            Ok(previous_hash)
538        }
539
540        let h1 = get_hash(&[10, 20, 30, 40, 50]).await?;
541        let h2 = get_hash(&[20, 30, 40, 50]).await?;
542        let h3 = get_hash(&[20, 21, 30, 40, 50]).await?;
543        assert_ne!(h1, h2);
544        assert_eq!(h2, h3);
545        Ok(())
546    }
547
548    #[tokio::test]
549    async fn test_historically_hashable_view_flush_with_no_hash_change() -> Result<(), ViewError> {
550        let context = MemoryContext::new_for_testing(());
551        let mut view =
552            HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
553
554        // Set and flush a value
555        view.set(42);
556        let mut batch = Batch::new();
557        view.pre_save(&mut batch)?;
558        context.store().write_batch(batch).await?;
559        view.post_save();
560
561        let hash_before = view.historical_hash().await?;
562
563        // Flush again without changes - no new hash should be stored
564        let mut batch = Batch::new();
565        view.pre_save(&mut batch)?;
566        assert!(batch.is_empty());
567        context.store().write_batch(batch).await?;
568        view.post_save();
569
570        let hash_after = view.historical_hash().await?;
571        assert_eq!(hash_before, hash_after);
572
573        Ok(())
574    }
575}