1use std::{
5 marker::PhantomData,
6 ops::{Deref, DerefMut},
7};
8
9#[cfg(with_metrics)]
10use linera_base::prometheus_util::MeasureLatency as _;
11
12use crate::{
13 batch::Batch,
14 common::from_bytes_option,
15 context::Context,
16 store::ReadableKeyValueStore as _,
17 views::{ClonableView, Hasher, HasherOutput, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
18};
19
20#[cfg(with_metrics)]
21mod metrics {
22 use std::sync::LazyLock;
23
24 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
25 use prometheus::HistogramVec;
26
27 pub static HISTORICALLY_HASHABLE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> =
29 LazyLock::new(|| {
30 register_histogram_vec(
31 "historically_hashable_view_hash_runtime",
32 "HistoricallyHashableView hash runtime",
33 &[],
34 exponential_bucket_latencies(5.0),
35 )
36 });
37}
38
39#[derive(Debug)]
41pub struct HistoricallyHashableView<C, W> {
42 stored_hash: Option<HasherOutput>,
44 inner: W,
46 hash: Option<HasherOutput>,
48 _phantom: PhantomData<C>,
50}
51
52#[repr(u8)]
54enum KeyTag {
55 Inner = MIN_VIEW_TAG,
57 Hash,
59}
60
61impl<C, W> HistoricallyHashableView<C, W> {
62 fn make_hash(
63 stored_hash: Option<HasherOutput>,
64 batch: &Batch,
65 ) -> Result<HasherOutput, ViewError> {
66 #[cfg(with_metrics)]
67 let _hash_latency = metrics::HISTORICALLY_HASHABLE_VIEW_HASH_RUNTIME.measure_latency();
68 let stored_hash = stored_hash.unwrap_or_default();
69 if batch.is_empty() {
70 return Ok(stored_hash);
71 }
72 let mut hasher = sha3::Sha3_256::default();
73 hasher.update_with_bytes(&stored_hash)?;
74 hasher.update_with_bcs_bytes(&batch)?;
75 Ok(hasher.finalize())
76 }
77}
78
79impl<C, W, C2> ReplaceContext<C2> for HistoricallyHashableView<C, W>
80where
81 W: View<Context = C> + ReplaceContext<C2>,
82 C: Context,
83 C2: Context,
84{
85 type Target = HistoricallyHashableView<C2, <W as ReplaceContext<C2>>::Target>;
86
87 async fn with_context(
88 &mut self,
89 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
90 ) -> Self::Target {
91 HistoricallyHashableView {
92 _phantom: PhantomData,
93 stored_hash: self.stored_hash,
94 hash: self.hash,
95 inner: self.inner.with_context(ctx).await,
96 }
97 }
98}
99
100impl<W> View for HistoricallyHashableView<W::Context, W>
101where
102 W: View,
103{
104 const NUM_INIT_KEYS: usize = 1 + W::NUM_INIT_KEYS;
105
106 type Context = W::Context;
107
108 fn context(&self) -> &Self::Context {
109 self.inner.context()
110 }
111
112 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
113 let mut v = vec![context.base_key().base_tag(KeyTag::Hash as u8)];
114 let base_key = context.base_key().base_tag(KeyTag::Inner as u8);
115 let context = context.clone_with_base_key(base_key);
116 v.extend(W::pre_load(&context)?);
117 Ok(v)
118 }
119
120 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
121 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
122 let base_key = context.base_key().base_tag(KeyTag::Inner as u8);
123 let context = context.clone_with_base_key(base_key);
124 let inner = W::post_load(
125 context,
126 values.get(1..).ok_or(ViewError::PostLoadValuesError)?,
127 )?;
128 Ok(Self {
129 _phantom: PhantomData,
130 stored_hash: hash,
131 hash,
132 inner,
133 })
134 }
135
136 async fn load(context: Self::Context) -> Result<Self, ViewError> {
137 let keys = Self::pre_load(&context)?;
138 let values = context.store().read_multi_values_bytes(keys).await?;
139 Self::post_load(context, &values)
140 }
141
142 fn rollback(&mut self) {
143 self.inner.rollback();
144 self.hash = self.stored_hash;
145 }
146
147 async fn has_pending_changes(&self) -> bool {
148 self.inner.has_pending_changes().await
149 }
150
151 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
152 let mut inner_batch = Batch::new();
153 self.inner.flush(&mut inner_batch)?;
154 let hash = Self::make_hash(self.stored_hash, &inner_batch)?;
155 batch.operations.extend(inner_batch.operations);
156 if self.stored_hash != Some(hash) {
157 let mut key = self.inner.context().base_key().bytes.clone();
158 let tag = key.last_mut().unwrap();
159 *tag = KeyTag::Hash as u8;
160 batch.put_key_value(key, &hash)?;
161 self.stored_hash = Some(hash);
162 }
163 self.hash = Some(hash);
165 Ok(false)
167 }
168
169 fn clear(&mut self) {
170 self.inner.clear();
171 self.hash = None;
172 }
173}
174
175impl<W> ClonableView for HistoricallyHashableView<W::Context, W>
176where
177 W: ClonableView,
178{
179 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
180 Ok(HistoricallyHashableView {
181 _phantom: PhantomData,
182 stored_hash: self.stored_hash,
183 hash: self.hash,
184 inner: self.inner.clone_unchecked()?,
185 })
186 }
187}
188
189impl<W: ClonableView> HistoricallyHashableView<W::Context, W> {
190 pub async fn historical_hash(&mut self) -> Result<HasherOutput, ViewError> {
192 if let Some(hash) = self.hash {
193 return Ok(hash);
194 }
195 let mut batch = Batch::new();
196 if self.inner.has_pending_changes().await {
197 let mut inner = self.inner.clone_unchecked()?;
198 inner.flush(&mut batch)?;
199 }
200 let hash = Self::make_hash(self.stored_hash, &batch)?;
201 self.hash = Some(hash);
203 Ok(hash)
204 }
205}
206
207impl<C, W> Deref for HistoricallyHashableView<C, W> {
208 type Target = W;
209
210 fn deref(&self) -> &W {
211 &self.inner
212 }
213}
214
215impl<C, W> DerefMut for HistoricallyHashableView<C, W> {
216 fn deref_mut(&mut self) -> &mut W {
217 self.hash = None;
219 &mut self.inner
220 }
221}
222
223#[cfg(with_graphql)]
224mod graphql {
225 use std::borrow::Cow;
226
227 use super::HistoricallyHashableView;
228 use crate::context::Context;
229
230 impl<C, W> async_graphql::OutputType for HistoricallyHashableView<C, W>
231 where
232 C: Context,
233 W: async_graphql::OutputType + Send + Sync,
234 {
235 fn type_name() -> Cow<'static, str> {
236 W::type_name()
237 }
238
239 fn qualified_type_name() -> String {
240 W::qualified_type_name()
241 }
242
243 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
244 W::create_type_info(registry)
245 }
246
247 async fn resolve(
248 &self,
249 ctx: &async_graphql::ContextSelectionSet<'_>,
250 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
251 ) -> async_graphql::ServerResult<async_graphql::Value> {
252 self.inner.resolve(ctx, field).await
253 }
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use crate::{
261 context::MemoryContext, register_view::RegisterView, store::WritableKeyValueStore as _,
262 };
263
264 #[tokio::test]
265 async fn test_historically_hashable_view_initial_state() -> Result<(), ViewError> {
266 let context = MemoryContext::new_for_testing(());
267 let mut view =
268 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
269
270 assert!(!view.has_pending_changes().await);
272
273 let hash = view.historical_hash().await?;
275 assert_eq!(hash, HasherOutput::default());
276
277 Ok(())
278 }
279
280 #[tokio::test]
281 async fn test_historically_hashable_view_hash_changes_with_modifications(
282 ) -> Result<(), ViewError> {
283 let context = MemoryContext::new_for_testing(());
284 let mut view =
285 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
286
287 let hash0 = view.historical_hash().await?;
289
290 view.set(42);
292 assert!(view.has_pending_changes().await);
293
294 let hash1 = view.historical_hash().await?;
296
297 assert!(view.has_pending_changes().await);
299 assert_ne!(hash0, hash1);
300
301 let mut batch = Batch::new();
303 view.flush(&mut batch)?;
304 context.store().write_batch(batch).await?;
305 assert!(!view.has_pending_changes().await);
306 assert_eq!(hash1, view.historical_hash().await?);
307
308 view.set(84);
310 let hash2 = view.historical_hash().await?;
311 assert_ne!(hash1, hash2);
312
313 Ok(())
314 }
315
316 #[tokio::test]
317 async fn test_historically_hashable_view_reloaded() -> Result<(), ViewError> {
318 let context = MemoryContext::new_for_testing(());
319 let mut view =
320 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
321
322 view.set(42);
324 let mut batch = Batch::new();
325 view.flush(&mut batch)?;
326 context.store().write_batch(batch).await?;
327
328 let hash_after_flush = view.historical_hash().await?;
329
330 let mut view2 =
332 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
333
334 let hash_reloaded = view2.historical_hash().await?;
336 assert_eq!(hash_after_flush, hash_reloaded);
337
338 Ok(())
339 }
340
341 #[tokio::test]
342 async fn test_historically_hashable_view_rollback() -> Result<(), ViewError> {
343 let context = MemoryContext::new_for_testing(());
344 let mut view =
345 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
346
347 view.set(42);
349 let mut batch = Batch::new();
350 view.flush(&mut batch)?;
351 context.store().write_batch(batch).await?;
352
353 let hash_before = view.historical_hash().await?;
354 assert!(!view.has_pending_changes().await);
355
356 view.set(84);
358 assert!(view.has_pending_changes().await);
359 let hash_modified = view.historical_hash().await?;
360 assert_ne!(hash_before, hash_modified);
361
362 view.rollback();
364 assert!(!view.has_pending_changes().await);
365
366 let hash_after_rollback = view.historical_hash().await?;
368 assert_eq!(hash_before, hash_after_rollback);
369
370 Ok(())
371 }
372
373 #[tokio::test]
374 async fn test_historically_hashable_view_clear() -> Result<(), ViewError> {
375 let context = MemoryContext::new_for_testing(());
376 let mut view =
377 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
378
379 view.set(42);
381 let mut batch = Batch::new();
382 view.flush(&mut batch)?;
383 context.store().write_batch(batch).await?;
384
385 assert_ne!(view.historical_hash().await?, HasherOutput::default());
386
387 view.clear();
389 assert!(view.has_pending_changes().await);
390
391 let mut batch = Batch::new();
393 let delete_view = view.flush(&mut batch)?;
394 assert!(!delete_view);
395 context.store().write_batch(batch).await?;
396
397 assert_ne!(view.historical_hash().await?, HasherOutput::default());
399
400 Ok(())
401 }
402
403 #[tokio::test]
404 async fn test_historically_hashable_view_clone_unchecked() -> Result<(), ViewError> {
405 let context = MemoryContext::new_for_testing(());
406 let mut view =
407 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
408
409 view.set(42);
411 let mut batch = Batch::new();
412 view.flush(&mut batch)?;
413 context.store().write_batch(batch).await?;
414
415 let original_hash = view.historical_hash().await?;
416
417 let mut cloned_view = view.clone_unchecked()?;
419
420 let cloned_hash = cloned_view.historical_hash().await?;
422 assert_eq!(original_hash, cloned_hash);
423
424 cloned_view.set(84);
426 let cloned_hash_after = cloned_view.historical_hash().await?;
427 assert_ne!(original_hash, cloned_hash_after);
428
429 let original_hash_after = view.historical_hash().await?;
431 assert_eq!(original_hash, original_hash_after);
432
433 Ok(())
434 }
435
436 #[tokio::test]
437 async fn test_historically_hashable_view_flush_updates_stored_hash() -> Result<(), ViewError> {
438 let context = MemoryContext::new_for_testing(());
439 let mut view =
440 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
441
442 assert!(!view.has_pending_changes().await);
444
445 view.set(42);
447 assert!(view.has_pending_changes().await);
448
449 let hash_before_flush = view.historical_hash().await?;
450
451 let mut batch = Batch::new();
453 let delete_view = view.flush(&mut batch)?;
454 assert!(!delete_view);
455 context.store().write_batch(batch).await?;
456
457 assert!(!view.has_pending_changes().await);
458
459 view.set(84);
461 let hash_after_second_change = view.historical_hash().await?;
462
463 assert_ne!(hash_before_flush, hash_after_second_change);
465
466 Ok(())
467 }
468
469 #[tokio::test]
470 async fn test_historically_hashable_view_deref() -> Result<(), ViewError> {
471 let context = MemoryContext::new_for_testing(());
472 let mut view =
473 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
474
475 view.set(42);
477 assert_eq!(*view.get(), 42);
478
479 view.set(84);
481 assert_eq!(*view.get(), 84);
482
483 Ok(())
484 }
485
486 #[tokio::test]
487 async fn test_historically_hashable_view_sequential_modifications() -> Result<(), ViewError> {
488 async fn get_hash(values: &[u32]) -> Result<HasherOutput, ViewError> {
489 let context = MemoryContext::new_for_testing(());
490 let mut view =
491 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
492
493 let mut previous_hash = view.historical_hash().await?;
494 for &value in values {
495 view.set(value);
496 if value % 2 == 0 {
497 let mut batch = Batch::new();
499 view.flush(&mut batch)?;
500 context.store().write_batch(batch).await?;
501 }
502 let current_hash = view.historical_hash().await?;
503 assert_ne!(previous_hash, current_hash);
504 previous_hash = current_hash;
505 }
506 Ok(previous_hash)
507 }
508
509 let h1 = get_hash(&[10, 20, 30, 40, 50]).await?;
510 let h2 = get_hash(&[20, 30, 40, 50]).await?;
511 let h3 = get_hash(&[20, 21, 30, 40, 50]).await?;
512 assert_ne!(h1, h2);
513 assert_eq!(h2, h3);
514 Ok(())
515 }
516
517 #[tokio::test]
518 async fn test_historically_hashable_view_flush_with_no_hash_change() -> Result<(), ViewError> {
519 let context = MemoryContext::new_for_testing(());
520 let mut view =
521 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
522
523 view.set(42);
525 let mut batch = Batch::new();
526 view.flush(&mut batch)?;
527 context.store().write_batch(batch).await?;
528
529 let hash_before = view.historical_hash().await?;
530
531 let mut batch = Batch::new();
533 view.flush(&mut batch)?;
534 assert!(batch.is_empty());
535 context.store().write_batch(batch).await?;
536
537 let hash_after = view.historical_hash().await?;
538 assert_eq!(hash_before, hash_after);
539
540 Ok(())
541 }
542}