1use std::{
5 marker::PhantomData,
6 ops::{Deref, DerefMut},
7 sync::Mutex,
8};
9
10use allocative::Allocative;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::visit_allocative_simple;
14
15use crate::{
16 batch::Batch,
17 common::from_bytes_option,
18 context::Context,
19 store::ReadableKeyValueStore as _,
20 views::{ClonableView, Hasher, HasherOutput, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
21};
22
23#[cfg(with_metrics)]
24mod metrics {
25 use std::sync::LazyLock;
26
27 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
28 use prometheus::HistogramVec;
29
30 pub static HISTORICALLY_HASHABLE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> =
32 LazyLock::new(|| {
33 register_histogram_vec(
34 "historically_hashable_view_hash_runtime",
35 "HistoricallyHashableView hash runtime",
36 &[],
37 exponential_bucket_latencies(5.0),
38 )
39 });
40}
41
42#[derive(Debug, Allocative)]
44#[allocative(bound = "C, W: Allocative")]
45pub struct HistoricallyHashableView<C, W> {
46 #[allocative(visit = visit_allocative_simple)]
48 stored_hash: Option<HasherOutput>,
49 inner: W,
51 #[allocative(visit = visit_allocative_simple)]
53 hash: Mutex<Option<HasherOutput>>,
54 #[allocative(skip)]
56 _phantom: PhantomData<C>,
57}
58
59#[repr(u8)]
61enum KeyTag {
62 Inner = MIN_VIEW_TAG,
64 Hash,
66}
67
68impl<C, W> HistoricallyHashableView<C, W> {
69 fn make_hash(
70 stored_hash: Option<HasherOutput>,
71 batch: &Batch,
72 ) -> Result<HasherOutput, ViewError> {
73 #[cfg(with_metrics)]
74 let _hash_latency = metrics::HISTORICALLY_HASHABLE_VIEW_HASH_RUNTIME.measure_latency();
75 let stored_hash = stored_hash.unwrap_or_default();
76 if batch.is_empty() {
77 return Ok(stored_hash);
78 }
79 let mut hasher = sha3::Sha3_256::default();
80 hasher.update_with_bytes(&stored_hash)?;
81 hasher.update_with_bcs_bytes(&batch)?;
82 Ok(hasher.finalize())
83 }
84}
85
86impl<C, W, C2> ReplaceContext<C2> for HistoricallyHashableView<C, W>
87where
88 W: View<Context = C> + ReplaceContext<C2>,
89 C: Context,
90 C2: Context,
91{
92 type Target = HistoricallyHashableView<C2, <W as ReplaceContext<C2>>::Target>;
93
94 async fn with_context(
95 &mut self,
96 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
97 ) -> Self::Target {
98 HistoricallyHashableView {
99 _phantom: PhantomData,
100 stored_hash: self.stored_hash,
101 hash: Mutex::new(*self.hash.get_mut().unwrap()),
102 inner: self.inner.with_context(ctx).await,
103 }
104 }
105}
106
107impl<W> View for HistoricallyHashableView<W::Context, W>
108where
109 W: View,
110{
111 const NUM_INIT_KEYS: usize = 1 + W::NUM_INIT_KEYS;
112
113 type Context = W::Context;
114
115 fn context(&self) -> Self::Context {
116 self.inner.context().clone_with_trimmed_key(1)
118 }
119
120 fn pre_load(context: &Self::Context) -> Result<Vec<Vec<u8>>, ViewError> {
121 let mut v = vec![context.base_key().base_tag(KeyTag::Hash as u8)];
122 let base_key = context.base_key().base_tag(KeyTag::Inner as u8);
123 let context = context.clone_with_base_key(base_key);
124 v.extend(W::pre_load(&context)?);
125 Ok(v)
126 }
127
128 fn post_load(context: Self::Context, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
129 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
130 let base_key = context.base_key().base_tag(KeyTag::Inner as u8);
131 let context = context.clone_with_base_key(base_key);
132 let inner = W::post_load(
133 context,
134 values.get(1..).ok_or(ViewError::PostLoadValuesError)?,
135 )?;
136 Ok(Self {
137 _phantom: PhantomData,
138 stored_hash: hash,
139 hash: Mutex::new(hash),
140 inner,
141 })
142 }
143
144 async fn load(context: Self::Context) -> Result<Self, ViewError> {
145 let keys = Self::pre_load(&context)?;
146 let values = context.store().read_multi_values_bytes(&keys).await?;
147 Self::post_load(context, &values)
148 }
149
150 fn rollback(&mut self) {
151 self.inner.rollback();
152 *self.hash.get_mut().unwrap() = self.stored_hash;
153 }
154
155 async fn has_pending_changes(&self) -> bool {
156 self.inner.has_pending_changes().await
157 }
158
159 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
160 let mut inner_batch = Batch::new();
161 self.inner.pre_save(&mut inner_batch)?;
162 let new_hash = {
163 let mut maybe_hash = self.hash.lock().unwrap();
164 match maybe_hash.as_mut() {
165 Some(hash) => *hash,
166 None => {
167 let hash = Self::make_hash(self.stored_hash, &inner_batch)?;
168 *maybe_hash = Some(hash);
169 hash
170 }
171 }
172 };
173 batch.operations.extend(inner_batch.operations);
174
175 if self.stored_hash != Some(new_hash) {
176 let mut key = self.inner.context().base_key().bytes.clone();
177 let tag = key.last_mut().unwrap();
178 *tag = KeyTag::Hash as u8;
179 batch.put_key_value(key, &new_hash)?;
180 }
181 Ok(false)
183 }
184
185 fn post_save(&mut self) {
186 let new_hash = self
187 .hash
188 .get_mut()
189 .unwrap()
190 .expect("hash should be computed in pre_save");
191 self.stored_hash = Some(new_hash);
192 self.inner.post_save();
193 }
194
195 fn clear(&mut self) {
196 self.inner.clear();
197 *self.hash.get_mut().unwrap() = None;
198 }
199}
200
201impl<W> ClonableView for HistoricallyHashableView<W::Context, W>
202where
203 W: ClonableView,
204{
205 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
206 Ok(HistoricallyHashableView {
207 _phantom: PhantomData,
208 stored_hash: self.stored_hash,
209 hash: Mutex::new(*self.hash.get_mut().unwrap()),
210 inner: self.inner.clone_unchecked()?,
211 })
212 }
213}
214
215impl<W: View> HistoricallyHashableView<W::Context, W> {
216 pub async fn historical_hash(&mut self) -> Result<HasherOutput, ViewError> {
218 if let Some(hash) = self.hash.get_mut().unwrap() {
219 return Ok(*hash);
220 }
221 let mut batch = Batch::new();
222 self.inner.pre_save(&mut batch)?;
223 let hash = Self::make_hash(self.stored_hash, &batch)?;
224 *self.hash.get_mut().unwrap() = Some(hash);
226 Ok(hash)
227 }
228}
229
230impl<C, W> Deref for HistoricallyHashableView<C, W> {
231 type Target = W;
232
233 fn deref(&self) -> &W {
234 &self.inner
235 }
236}
237
238impl<C, W> DerefMut for HistoricallyHashableView<C, W> {
239 fn deref_mut(&mut self) -> &mut W {
240 *self.hash.get_mut().unwrap() = None;
242 &mut self.inner
243 }
244}
245
246#[cfg(with_graphql)]
247mod graphql {
248 use std::borrow::Cow;
249
250 use super::HistoricallyHashableView;
251 use crate::context::Context;
252
253 impl<C, W> async_graphql::OutputType for HistoricallyHashableView<C, W>
254 where
255 C: Context,
256 W: async_graphql::OutputType + Send + Sync,
257 {
258 fn type_name() -> Cow<'static, str> {
259 W::type_name()
260 }
261
262 fn qualified_type_name() -> String {
263 W::qualified_type_name()
264 }
265
266 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
267 W::create_type_info(registry)
268 }
269
270 async fn resolve(
271 &self,
272 ctx: &async_graphql::ContextSelectionSet<'_>,
273 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
274 ) -> async_graphql::ServerResult<async_graphql::Value> {
275 self.inner.resolve(ctx, field).await
276 }
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::{
284 context::MemoryContext, register_view::RegisterView, store::WritableKeyValueStore as _,
285 };
286
287 #[tokio::test]
288 async fn test_historically_hashable_view_initial_state() -> Result<(), ViewError> {
289 let context = MemoryContext::new_for_testing(());
290 let mut view =
291 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
292
293 assert!(!view.has_pending_changes().await);
295
296 let hash = view.historical_hash().await?;
298 assert_eq!(hash, HasherOutput::default());
299
300 Ok(())
301 }
302
303 #[tokio::test]
304 async fn test_historically_hashable_view_hash_changes_with_modifications(
305 ) -> Result<(), ViewError> {
306 let context = MemoryContext::new_for_testing(());
307 let mut view =
308 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
309
310 let hash0 = view.historical_hash().await?;
312
313 view.set(42);
315 assert!(view.has_pending_changes().await);
316
317 let hash1 = view.historical_hash().await?;
319
320 assert!(view.has_pending_changes().await);
322 assert_ne!(hash0, hash1);
323
324 let mut batch = Batch::new();
326 view.pre_save(&mut batch)?;
327 context.store().write_batch(batch).await?;
328 view.post_save();
329 assert!(!view.has_pending_changes().await);
330 assert_eq!(hash1, view.historical_hash().await?);
331
332 view.set(84);
334 let hash2 = view.historical_hash().await?;
335 assert_ne!(hash1, hash2);
336
337 Ok(())
338 }
339
340 #[tokio::test]
341 async fn test_historically_hashable_view_reloaded() -> Result<(), ViewError> {
342 let context = MemoryContext::new_for_testing(());
343 let mut view =
344 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
345
346 view.set(42);
348 let mut batch = Batch::new();
349 view.pre_save(&mut batch)?;
350 context.store().write_batch(batch).await?;
351 view.post_save();
352
353 let hash_after_flush = view.historical_hash().await?;
354
355 let mut view2 =
357 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
358
359 let hash_reloaded = view2.historical_hash().await?;
361 assert_eq!(hash_after_flush, hash_reloaded);
362
363 Ok(())
364 }
365
366 #[tokio::test]
367 async fn test_historically_hashable_view_rollback() -> Result<(), ViewError> {
368 let context = MemoryContext::new_for_testing(());
369 let mut view =
370 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
371
372 view.set(42);
374 let mut batch = Batch::new();
375 view.pre_save(&mut batch)?;
376 context.store().write_batch(batch).await?;
377 view.post_save();
378
379 let hash_before = view.historical_hash().await?;
380 assert!(!view.has_pending_changes().await);
381
382 view.set(84);
384 assert!(view.has_pending_changes().await);
385 let hash_modified = view.historical_hash().await?;
386 assert_ne!(hash_before, hash_modified);
387
388 view.rollback();
390 assert!(!view.has_pending_changes().await);
391
392 let hash_after_rollback = view.historical_hash().await?;
394 assert_eq!(hash_before, hash_after_rollback);
395
396 Ok(())
397 }
398
399 #[tokio::test]
400 async fn test_historically_hashable_view_clear() -> Result<(), ViewError> {
401 let context = MemoryContext::new_for_testing(());
402 let mut view =
403 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
404
405 view.set(42);
407 let mut batch = Batch::new();
408 view.pre_save(&mut batch)?;
409 context.store().write_batch(batch).await?;
410 view.post_save();
411
412 assert_ne!(view.historical_hash().await?, HasherOutput::default());
413
414 view.clear();
416 assert!(view.has_pending_changes().await);
417
418 let mut batch = Batch::new();
420 let delete_view = view.pre_save(&mut batch)?;
421 assert!(!delete_view);
422 context.store().write_batch(batch).await?;
423 view.post_save();
424
425 assert_ne!(view.historical_hash().await?, HasherOutput::default());
427
428 Ok(())
429 }
430
431 #[tokio::test]
432 async fn test_historically_hashable_view_clone_unchecked() -> Result<(), ViewError> {
433 let context = MemoryContext::new_for_testing(());
434 let mut view =
435 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
436
437 view.set(42);
439 let mut batch = Batch::new();
440 view.pre_save(&mut batch)?;
441 context.store().write_batch(batch).await?;
442 view.post_save();
443
444 let original_hash = view.historical_hash().await?;
445
446 let mut cloned_view = view.clone_unchecked()?;
448
449 let cloned_hash = cloned_view.historical_hash().await?;
451 assert_eq!(original_hash, cloned_hash);
452
453 cloned_view.set(84);
455 let cloned_hash_after = cloned_view.historical_hash().await?;
456 assert_ne!(original_hash, cloned_hash_after);
457
458 let original_hash_after = view.historical_hash().await?;
460 assert_eq!(original_hash, original_hash_after);
461
462 Ok(())
463 }
464
465 #[tokio::test]
466 async fn test_historically_hashable_view_flush_updates_stored_hash() -> Result<(), ViewError> {
467 let context = MemoryContext::new_for_testing(());
468 let mut view =
469 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
470
471 assert!(!view.has_pending_changes().await);
473
474 view.set(42);
476 assert!(view.has_pending_changes().await);
477
478 let hash_before_flush = view.historical_hash().await?;
479
480 let mut batch = Batch::new();
482 let delete_view = view.pre_save(&mut batch)?;
483 assert!(!delete_view);
484 context.store().write_batch(batch).await?;
485 view.post_save();
486
487 assert!(!view.has_pending_changes().await);
488
489 view.set(84);
491 let hash_after_second_change = view.historical_hash().await?;
492
493 assert_ne!(hash_before_flush, hash_after_second_change);
495
496 Ok(())
497 }
498
499 #[tokio::test]
500 async fn test_historically_hashable_view_deref() -> Result<(), ViewError> {
501 let context = MemoryContext::new_for_testing(());
502 let mut view =
503 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
504
505 view.set(42);
507 assert_eq!(*view.get(), 42);
508
509 view.set(84);
511 assert_eq!(*view.get(), 84);
512
513 Ok(())
514 }
515
516 #[tokio::test]
517 async fn test_historically_hashable_view_sequential_modifications() -> Result<(), ViewError> {
518 async fn get_hash(values: &[u32]) -> Result<HasherOutput, ViewError> {
519 let context = MemoryContext::new_for_testing(());
520 let mut view =
521 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
522
523 let mut previous_hash = view.historical_hash().await?;
524 for &value in values {
525 view.set(value);
526 if value % 2 == 0 {
527 let mut batch = Batch::new();
529 view.pre_save(&mut batch)?;
530 context.store().write_batch(batch).await?;
531 view.post_save();
532 }
533 let current_hash = view.historical_hash().await?;
534 assert_ne!(previous_hash, current_hash);
535 previous_hash = current_hash;
536 }
537 Ok(previous_hash)
538 }
539
540 let h1 = get_hash(&[10, 20, 30, 40, 50]).await?;
541 let h2 = get_hash(&[20, 30, 40, 50]).await?;
542 let h3 = get_hash(&[20, 21, 30, 40, 50]).await?;
543 assert_ne!(h1, h2);
544 assert_eq!(h2, h3);
545 Ok(())
546 }
547
548 #[tokio::test]
549 async fn test_historically_hashable_view_flush_with_no_hash_change() -> Result<(), ViewError> {
550 let context = MemoryContext::new_for_testing(());
551 let mut view =
552 HistoricallyHashableView::<_, RegisterView<_, u32>>::load(context.clone()).await?;
553
554 view.set(42);
556 let mut batch = Batch::new();
557 view.pre_save(&mut batch)?;
558 context.store().write_batch(batch).await?;
559 view.post_save();
560
561 let hash_before = view.historical_hash().await?;
562
563 let mut batch = Batch::new();
565 view.pre_save(&mut batch)?;
566 assert!(batch.is_empty());
567 context.store().write_batch(batch).await?;
568 view.post_save();
569
570 let hash_after = view.historical_hash().await?;
571 assert_eq!(hash_before, hash_after);
572
573 Ok(())
574 }
575}