1use std::{
5 collections::{BTreeMap, HashMap},
6 fmt::Debug,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14 crypto::CryptoHash,
15 data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
16 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
17};
18use linera_cache::ValueCache;
19use linera_chain::{
20 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
21 ChainStateView,
22};
23use linera_execution::{
24 BlobState, ExecutionRuntimeConfig, SharedCommittees, UserContractCode, UserServiceCode,
25 WasmRuntime,
26};
27use linera_views::{
28 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
29 batch::Batch,
30 context::ViewContext,
31 store::{
32 KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
33 },
34 views::View,
35 ViewError,
36};
37use serde::{Deserialize, Serialize};
38use tracing::instrument;
39#[cfg(with_testing)]
40use {
41 futures::channel::oneshot::{self, Receiver},
42 linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
43 std::cmp::Reverse,
44};
45
46use crate::{ChainRuntimeContext, Clock, Storage};
47
48#[cfg(with_metrics)]
49pub mod metrics {
50 use std::sync::LazyLock;
51
52 use linera_base::prometheus_util::{
53 exponential_bucket_interval, exponential_bucket_latencies, linear_bucket_interval,
54 register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
55 };
56 use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
57
58 pub(super) const SOURCE_LABEL: &str = "source";
60 pub(super) const CACHE: &str = "cache";
62 pub(super) const DB: &str = "db";
64
65 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
67 register_int_counter_vec(
68 "contains_blob",
69 "The metric counting how often a blob is tested for existence from storage",
70 &[SOURCE_LABEL],
71 )
72 });
73
74 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
76 register_int_counter_vec(
77 "contains_blobs",
78 "The metric counting how often multiple blobs are tested for existence from storage",
79 &[SOURCE_LABEL],
80 )
81 });
82
83 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
85 register_int_counter_vec(
86 "contains_blob_state",
87 "The metric counting how often a blob state is tested for existence from storage",
88 &[SOURCE_LABEL],
89 )
90 });
91
92 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
94 register_int_counter_vec(
95 "contains_certificate",
96 "The metric counting how often a certificate is tested for existence from storage",
97 &[SOURCE_LABEL],
98 )
99 });
100
101 #[doc(hidden)]
103 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
104 register_int_counter_vec(
105 "read_confirmed_block",
106 "The metric counting how often a hashed confirmed block is read from storage",
107 &[SOURCE_LABEL],
108 )
109 });
110
111 #[doc(hidden)]
113 pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
114 LazyLock::new(|| {
115 register_int_counter_vec(
116 "read_confirmed_blocks",
117 "The metric counting how often confirmed blocks are read from storage",
118 &[SOURCE_LABEL],
119 )
120 });
121
122 #[doc(hidden)]
124 pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
125 register_int_counter_vec(
126 "read_blob",
127 "The metric counting how often a blob is read from storage",
128 &[SOURCE_LABEL],
129 )
130 });
131
132 #[doc(hidden)]
134 pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
135 register_int_counter_vec(
136 "read_blob_state",
137 "The metric counting how often a blob state is read from storage",
138 &[SOURCE_LABEL],
139 )
140 });
141
142 #[doc(hidden)]
144 pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
145 register_int_counter(
146 "write_blob",
147 "The metric counting how often a blob is written to storage",
148 )
149 });
150
151 #[doc(hidden)]
153 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
154 register_int_counter_vec(
155 "read_certificate",
156 "The metric counting how often a certificate is read from storage",
157 &[SOURCE_LABEL],
158 )
159 });
160
161 #[doc(hidden)]
163 pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
164 register_int_counter_vec(
165 "read_certificates",
166 "The metric counting how often certificate are read from storage",
167 &[SOURCE_LABEL],
168 )
169 });
170
171 #[doc(hidden)]
173 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
174 register_int_counter(
175 "write_certificate",
176 "The metric counting how often a certificate is written to storage",
177 )
178 });
179
180 pub(super) static CERTIFICATE_LITE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
186 register_histogram(
187 "certificate_lite_bytes",
188 "Serialized size of the lite-certificate (signatures + metadata) in bytes",
189 exponential_bucket_interval(128.0, 2_097_152.0),
190 )
191 });
192
193 pub(super) static CERTIFICATE_VALUE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
197 register_histogram(
198 "certificate_value_bytes",
199 "Serialized size of the certificate value (block payload) in bytes",
200 exponential_bucket_interval(256.0, 16_777_216.0),
201 )
202 });
203
204 pub(super) static CERTIFICATE_SIGNER_COUNT: LazyLock<Histogram> = LazyLock::new(|| {
208 register_histogram(
209 "certificate_signer_count",
210 "Number of validator signatures attached to each confirmed certificate",
211 linear_bucket_interval(1.0, 1.0, 20.0),
212 )
213 });
214
215 #[doc(hidden)]
217 pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
218 register_histogram_vec(
219 "load_chain_latency",
220 "The latency to load a chain state",
221 &[],
222 exponential_bucket_latencies(1000.0),
223 )
224 });
225
226 #[doc(hidden)]
228 pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
229 register_int_counter_vec(
230 "read_event",
231 "The metric counting how often an event is read from storage",
232 &[SOURCE_LABEL],
233 )
234 });
235
236 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
238 register_int_counter_vec(
239 "contains_event",
240 "The metric counting how often an event is tested for existence from storage",
241 &[SOURCE_LABEL],
242 )
243 });
244
245 #[doc(hidden)]
247 pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
248 register_int_counter(
249 "write_event",
250 "The metric counting how often an event is written to storage",
251 )
252 });
253
254 #[doc(hidden)]
256 pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
257 register_int_counter_vec(
258 "network_description",
259 "The metric counting how often the network description is read from storage",
260 &[SOURCE_LABEL],
261 )
262 });
263
264 #[doc(hidden)]
266 pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounter> = LazyLock::new(|| {
267 register_int_counter(
268 "write_network_description",
269 "The metric counting how often the network description is written to storage",
270 )
271 });
272}
273
274const BLOB_KEY: &[u8] = &[0];
276
277const BLOB_STATE_KEY: &[u8] = &[1];
279
280const LITE_CERTIFICATE_KEY: &[u8] = &[2];
282
283const BLOCK_KEY: &[u8] = &[3];
285
286const NETWORK_DESCRIPTION_KEY: &[u8] = &[4];
288
289fn get_block_keys() -> Vec<Vec<u8>> {
290 vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
291}
292
293#[derive(Default)]
294#[expect(clippy::type_complexity)]
295struct MultiPartitionBatch {
296 keys_value_bytes: BTreeMap<Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>>,
297}
298
299impl MultiPartitionBatch {
300 fn new() -> Self {
301 Self::default()
302 }
303
304 fn put_key_values(&mut self, root_key: Vec<u8>, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
305 let entry = self.keys_value_bytes.entry(root_key).or_default();
306 entry.extend(key_values);
307 }
308
309 fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
310 self.put_key_values(root_key, vec![(key, value)]);
311 }
312
313 fn add_blob(&mut self, blob: &Blob) {
314 #[cfg(with_metrics)]
315 metrics::WRITE_BLOB_COUNTER.inc();
316 let root_key = RootKey::BlobId(blob.id()).bytes();
317 let key = BLOB_KEY.to_vec();
318 self.put_key_value(root_key, key, blob.bytes().to_vec());
319 }
320
321 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
322 let root_key = RootKey::BlobId(blob_id).bytes();
323 let key = BLOB_STATE_KEY.to_vec();
324 let value = bcs::to_bytes(blob_state)?;
325 self.put_key_value(root_key, key, value);
326 Ok(())
327 }
328
329 fn add_certificate(
338 &mut self,
339 certificate: &ConfirmedBlockCertificate,
340 ) -> Result<(), ViewError> {
341 #[cfg(with_metrics)]
342 {
343 metrics::WRITE_CERTIFICATE_COUNTER.inc();
344 metrics::CERTIFICATE_SIGNER_COUNT.observe(certificate.signatures().len() as f64);
345 }
346 let hash = certificate.hash();
347
348 let root_key = RootKey::BlockHash(hash).bytes();
350 let mut key_values = Vec::new();
351 let key = LITE_CERTIFICATE_KEY.to_vec();
352 let value = bcs::to_bytes(&certificate.lite_certificate())?;
353 #[cfg(with_metrics)]
354 metrics::CERTIFICATE_LITE_BYTES.observe(value.len() as f64);
355 key_values.push((key, value));
356 let key = BLOCK_KEY.to_vec();
357 let value = bcs::to_bytes(&certificate.value())?;
358 #[cfg(with_metrics)]
359 metrics::CERTIFICATE_VALUE_BYTES.observe(value.len() as f64);
360 key_values.push((key, value));
361 self.put_key_values(root_key, key_values);
362
363 let chain_id = certificate.value().block().header.chain_id;
365 let height = certificate.value().block().header.height;
366 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
367 let height_key = to_height_key(height);
368 let index_value = bcs::to_bytes(&hash)?;
369 self.put_key_value(index_root_key, height_key, index_value);
370
371 let event_index_root_key = RootKey::EventBlockHeight(chain_id).bytes();
373 let height_value = bcs::to_bytes(&height)?;
374 for event in certificate.value().block().body.events.iter().flatten() {
375 let event_key = to_event_key(&EventId {
376 chain_id,
377 stream_id: event.stream_id.clone(),
378 index: event.index,
379 });
380 self.put_key_value(
381 event_index_root_key.clone(),
382 event_key,
383 height_value.clone(),
384 );
385 }
386
387 Ok(())
388 }
389
390 fn add_event(&mut self, event_id: &EventId, value: Vec<u8>) {
391 #[cfg(with_metrics)]
392 metrics::WRITE_EVENT_COUNTER.inc();
393 let key = to_event_key(event_id);
394 let root_key = RootKey::Event(event_id.chain_id).bytes();
395 self.put_key_value(root_key, key, value);
396 }
397
398 fn add_network_description(
399 &mut self,
400 information: &NetworkDescription,
401 ) -> Result<(), ViewError> {
402 #[cfg(with_metrics)]
403 metrics::WRITE_NETWORK_DESCRIPTION.inc();
404 let root_key = RootKey::NetworkDescription.bytes();
405 let key = NETWORK_DESCRIPTION_KEY.to_vec();
406 let value = bcs::to_bytes(information)?;
407 self.put_key_value(root_key, key, value);
408 Ok(())
409 }
410}
411
412#[derive(Clone, Copy, Debug)]
414pub struct StorageCacheConfig {
415 pub blob_cache_size: usize,
416 pub confirmed_block_cache_size: usize,
417 pub certificate_cache_size: usize,
418 pub certificate_raw_cache_size: usize,
419 pub event_cache_size: usize,
420 pub cache_cleanup_interval_secs: u64,
421}
422
423#[cfg(with_testing)]
425pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
426 blob_cache_size: 1000,
427 confirmed_block_cache_size: 1000,
428 certificate_cache_size: 1000,
429 certificate_raw_cache_size: 1000,
430 event_cache_size: 1000,
431 cache_cleanup_interval_secs: linera_cache::DEFAULT_CLEANUP_INTERVAL_SECS,
432};
433
434type RawCertificate = (Vec<u8>, Vec<u8>);
436
437#[derive(Clone)]
443pub struct StorageCaches {
444 pub(crate) blob: Arc<ValueCache<BlobId, Blob>>,
445 pub(crate) confirmed_block: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
446 pub(crate) certificate: Arc<ValueCache<CryptoHash, ConfirmedBlockCertificate>>,
447 pub(crate) certificate_raw: Arc<ValueCache<CryptoHash, RawCertificate>>,
448 pub(crate) event: Arc<ValueCache<EventId, Vec<u8>>>,
449}
450
451impl StorageCaches {
452 pub fn new(sizes: StorageCacheConfig) -> Self {
454 let interval = sizes.cache_cleanup_interval_secs;
455 Self {
456 blob: Arc::new(ValueCache::new(sizes.blob_cache_size, interval)),
457 confirmed_block: Arc::new(ValueCache::new(sizes.confirmed_block_cache_size, interval)),
458 certificate: Arc::new(ValueCache::new(sizes.certificate_cache_size, interval)),
459 certificate_raw: Arc::new(ValueCache::new(sizes.certificate_raw_cache_size, interval)),
460 event: Arc::new(ValueCache::new(sizes.event_cache_size, interval)),
461 }
462 }
463}
464
465#[derive(Clone)]
467pub struct DbStorage<Database, Clock = WallClock> {
468 database: Arc<Database>,
469 clock: Clock,
470 thread_pool: Arc<linera_execution::ThreadPool>,
471 wasm_runtime: Option<WasmRuntime>,
472 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
473 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
474 shared_committees: SharedCommittees,
475 caches: StorageCaches,
476 execution_runtime_config: ExecutionRuntimeConfig,
477}
478
479#[derive(Debug, Serialize, Deserialize)]
480enum RootKey {
481 NetworkDescription,
482 BlockExporterState(u32),
483 ChainState(ChainId),
484 BlockHash(CryptoHash),
485 BlobId(BlobId),
486 Event(ChainId),
487 BlockByHeight(ChainId),
488 EventBlockHeight(ChainId),
489}
490
491const CHAIN_ID_TAG: u8 = 2;
492const BLOB_ID_TAG: u8 = 4;
493const EVENT_ID_TAG: u8 = 5;
494
495impl RootKey {
496 fn bytes(&self) -> Vec<u8> {
497 bcs::to_bytes(self).unwrap()
498 }
499}
500
501#[derive(Debug, Serialize, Deserialize)]
502struct RestrictedEventId {
503 pub stream_id: StreamId,
504 pub index: u32,
505}
506
507fn to_event_key(event_id: &EventId) -> Vec<u8> {
508 let restricted_event_id = RestrictedEventId {
509 stream_id: event_id.stream_id.clone(),
510 index: event_id.index,
511 };
512 bcs::to_bytes(&restricted_event_id).unwrap()
513}
514
515pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
516 bcs::to_bytes(&height).unwrap()
517}
518
519fn is_chain_state(root_key: &[u8]) -> bool {
520 if root_key.is_empty() {
521 return false;
522 }
523 root_key[0] == CHAIN_ID_TAG
524}
525
526#[derive(Clone, Copy)]
529pub struct ChainStatesFirstAssignment;
530
531impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
532 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
533 if root_key.is_empty() {
534 return Ok(StoreInUse::Second);
535 }
536 let store = match is_chain_state(root_key) {
537 true => StoreInUse::First,
538 false => StoreInUse::Second,
539 };
540 Ok(store)
541 }
542}
543
544#[derive(Clone)]
546pub struct WallClock;
547
548#[cfg_attr(not(web), async_trait)]
549#[cfg_attr(web, async_trait(?Send))]
550impl Clock for WallClock {
551 fn current_time(&self) -> Timestamp {
552 Timestamp::now()
553 }
554
555 async fn sleep_until(&self, timestamp: Timestamp) {
556 let delta = timestamp.delta_since(Timestamp::now());
557 if delta > TimeDelta::ZERO {
558 linera_base::time::timer::sleep(delta.as_duration()).await
559 }
560 }
561}
562
563#[cfg(with_testing)]
564#[derive(Default)]
565struct TestClockInner {
566 time: Timestamp,
567 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
568 sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
571}
572
573#[cfg(with_testing)]
574impl TestClockInner {
575 fn set(&mut self, time: Timestamp) {
576 self.time = time;
577 let senders = self.sleeps.split_off(&Reverse(time));
578 for sender in senders.into_values().flatten() {
579 sender.send(()).ok();
581 }
582 }
583
584 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
585 let (sender, receiver) = oneshot::channel();
586 let should_auto_advance = self
587 .sleep_callback
588 .as_ref()
589 .is_some_and(|callback| callback(time));
590 if should_auto_advance && time > self.time {
591 self.set(time);
593 sender.send(()).ok();
595 } else if self.time >= time {
596 sender.send(()).ok();
598 } else {
599 self.sleeps.entry(Reverse(time)).or_default().push(sender);
600 }
601 receiver
602 }
603}
604
605#[cfg(with_testing)]
608#[derive(Clone, Default)]
609pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
610
611#[cfg(with_testing)]
612#[cfg_attr(not(web), async_trait)]
613#[cfg_attr(web, async_trait(?Send))]
614impl Clock for TestClock {
615 fn current_time(&self) -> Timestamp {
616 self.lock().time
617 }
618
619 async fn sleep_until(&self, timestamp: Timestamp) {
620 let receiver = self.lock().add_sleep_until(timestamp);
621 receiver.await.ok();
623 }
624}
625
626#[cfg(with_testing)]
627impl TestClock {
628 pub fn new() -> Self {
630 TestClock(Arc::default())
631 }
632
633 pub fn set(&self, time: Timestamp) {
635 self.lock().set(time);
636 }
637
638 pub fn add(&self, delta: TimeDelta) {
640 let mut guard = self.lock();
641 let time = guard.time.saturating_add(delta);
642 guard.set(time);
643 }
644
645 pub fn current_time(&self) -> Timestamp {
647 self.lock().time
648 }
649
650 #[cfg(with_testing)]
655 pub fn set_sleep_callback<F>(&self, callback: F)
656 where
657 F: Fn(Timestamp) -> bool + Send + Sync + 'static,
658 {
659 self.lock().sleep_callback = Some(Box::new(callback));
660 }
661
662 fn lock(&self) -> std::sync::MutexGuard<'_, TestClockInner> {
663 self.0.lock().expect("poisoned TestClock mutex")
664 }
665}
666
667#[cfg_attr(not(web), async_trait)]
668#[cfg_attr(web, async_trait(?Send))]
669impl<Database, C> Storage for DbStorage<Database, C>
670where
671 Database: KeyValueDatabase<
672 Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
673 Error: Send + Sync,
674 > + Clone
675 + linera_base::util::traits::AutoTraits
676 + 'static,
677 C: Clock + Clone + Send + Sync + 'static,
678{
679 type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
680 type Clock = C;
681 type BlockExporterContext = ViewContext<u32, Database::Store>;
682
683 fn clock(&self) -> &C {
684 &self.clock
685 }
686
687 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
688 &self.thread_pool
689 }
690
691 fn shared_committees(&self) -> &SharedCommittees {
692 &self.shared_committees
693 }
694
695 #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
696 async fn load_chain(
697 &self,
698 chain_id: ChainId,
699 ) -> Result<ChainStateView<Self::Context>, ViewError> {
700 #[cfg(with_metrics)]
701 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
702 let runtime_context = ChainRuntimeContext {
703 storage: self.clone(),
704 thread_pool: self.thread_pool.clone(),
705 chain_id,
706 execution_runtime_config: self.execution_runtime_config,
707 user_contracts: self.user_contracts.clone(),
708 user_services: self.user_services.clone(),
709 };
710 let root_key = RootKey::ChainState(chain_id).bytes();
711 let store = self.database.open_exclusive(&root_key)?;
712 let context = ViewContext::create_root_context(store, runtime_context).await?;
713 ChainStateView::load(context).await
714 }
715
716 #[instrument(level = "trace", skip_all, fields(%blob_id))]
717 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
718 if self.caches.blob.contains(&blob_id) {
719 #[cfg(with_metrics)]
720 metrics::CONTAINS_BLOB_COUNTER
721 .with_label_values(&[metrics::CACHE])
722 .inc();
723 return Ok(true);
724 }
725 let root_key = RootKey::BlobId(blob_id).bytes();
726 let store = self.database.open_shared(&root_key)?;
727 let test = store.contains_key(BLOB_KEY).await?;
728 #[cfg(with_metrics)]
729 metrics::CONTAINS_BLOB_COUNTER
730 .with_label_values(&[metrics::DB])
731 .inc();
732 Ok(test)
733 }
734
735 #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
736 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
737 let mut missing_blobs = Vec::new();
738 #[cfg(with_metrics)]
739 let mut cache_hits: u64 = 0;
740 #[cfg(with_metrics)]
741 let mut db_checks: u64 = 0;
742 for blob_id in blob_ids {
743 if self.caches.blob.contains(blob_id) {
744 #[cfg(with_metrics)]
745 {
746 cache_hits += 1;
747 }
748 continue;
749 }
750 #[cfg(with_metrics)]
751 {
752 db_checks += 1;
753 }
754 let root_key = RootKey::BlobId(*blob_id).bytes();
755 let store = self.database.open_shared(&root_key)?;
756 if !store.contains_key(BLOB_KEY).await? {
757 missing_blobs.push(*blob_id);
758 }
759 }
760 #[cfg(with_metrics)]
761 {
762 if cache_hits > 0 {
763 metrics::CONTAINS_BLOBS_COUNTER
764 .with_label_values(&[metrics::CACHE])
765 .inc_by(cache_hits);
766 }
767 if db_checks > 0 {
768 metrics::CONTAINS_BLOBS_COUNTER
769 .with_label_values(&[metrics::DB])
770 .inc_by(db_checks);
771 }
772 }
773 Ok(missing_blobs)
774 }
775
776 #[instrument(skip_all, fields(%blob_id))]
777 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
778 let root_key = RootKey::BlobId(blob_id).bytes();
779 let store = self.database.open_shared(&root_key)?;
780 let test = store.contains_key(BLOB_STATE_KEY).await?;
781 #[cfg(with_metrics)]
782 metrics::CONTAINS_BLOB_STATE_COUNTER
783 .with_label_values(&[metrics::DB])
784 .inc();
785 Ok(test)
786 }
787
788 #[instrument(skip_all, fields(%hash))]
789 async fn read_confirmed_block(
790 &self,
791 hash: CryptoHash,
792 ) -> Result<Option<Arc<ConfirmedBlock>>, ViewError> {
793 if let Some(block) = self.caches.confirmed_block.get(&hash) {
794 #[cfg(with_metrics)]
795 metrics::READ_CONFIRMED_BLOCK_COUNTER
796 .with_label_values(&[metrics::CACHE])
797 .inc();
798 return Ok(Some(block));
799 }
800 let root_key = RootKey::BlockHash(hash).bytes();
801 let store = self.database.open_shared(&root_key)?;
802 let value = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await?;
803 #[cfg(with_metrics)]
804 metrics::READ_CONFIRMED_BLOCK_COUNTER
805 .with_label_values(&[metrics::DB])
806 .inc();
807 match value {
808 Some(block) => Ok(Some(self.caches.confirmed_block.insert(&hash, block))),
809 None => Ok(None),
810 }
811 }
812
813 #[instrument(skip_all)]
814 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
815 &self,
816 hashes: I,
817 ) -> Result<Vec<Option<Arc<ConfirmedBlock>>>, ViewError> {
818 let hashes = hashes.into_iter().collect::<Vec<_>>();
819 if hashes.is_empty() {
820 return Ok(Vec::new());
821 }
822 let mut results = vec![None; hashes.len()];
823 let mut misses = Vec::new();
824 for (i, hash) in hashes.iter().enumerate() {
825 if let Some(block) = self.caches.confirmed_block.get(hash) {
826 results[i] = Some(block);
827 } else {
828 misses.push(i);
829 }
830 }
831 if !misses.is_empty() {
832 let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
833 let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
834 for (miss_idx, root_key) in misses.iter().zip(root_keys) {
835 let store = self.database.open_shared(&root_key)?;
836 if let Some(block) = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await? {
837 results[*miss_idx] = Some(
838 self.caches
839 .confirmed_block
840 .insert(&hashes[*miss_idx], block),
841 );
842 }
843 }
844 }
845 #[cfg(with_metrics)]
846 {
847 let cache_hits = (hashes.len() - misses.len()) as u64;
848 if cache_hits > 0 {
849 metrics::READ_CONFIRMED_BLOCKS_COUNTER
850 .with_label_values(&[metrics::CACHE])
851 .inc_by(cache_hits);
852 }
853 let db_reads = misses.len() as u64;
854 if db_reads > 0 {
855 metrics::READ_CONFIRMED_BLOCKS_COUNTER
856 .with_label_values(&[metrics::DB])
857 .inc_by(db_reads);
858 }
859 }
860 Ok(results)
861 }
862
863 #[instrument(skip_all, fields(%blob_id))]
864 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError> {
865 if let Some(blob) = self.caches.blob.get(&blob_id) {
866 #[cfg(with_metrics)]
867 metrics::READ_BLOB_COUNTER
868 .with_label_values(&[metrics::CACHE])
869 .inc();
870 return Ok(Some(blob));
871 }
872 let root_key = RootKey::BlobId(blob_id).bytes();
873 let store = self.database.open_shared(&root_key)?;
874 let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
875 #[cfg(with_metrics)]
876 metrics::READ_BLOB_COUNTER
877 .with_label_values(&[metrics::DB])
878 .inc();
879 match maybe_blob_bytes {
880 Some(blob_bytes) => {
881 let blob = Blob::new_with_id_unchecked(blob_id, blob_bytes);
882 Ok(Some(self.caches.blob.insert(&blob_id, blob)))
883 }
884 None => Ok(None),
885 }
886 }
887
888 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
889 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Arc<Blob>>>, ViewError> {
890 if blob_ids.is_empty() {
891 return Ok(Vec::new());
892 }
893 futures::future::try_join_all(blob_ids.iter().map(|blob_id| self.read_blob(*blob_id))).await
899 }
900
901 #[instrument(skip_all, fields(%blob_id))]
902 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
903 let root_key = RootKey::BlobId(blob_id).bytes();
904 let store = self.database.open_shared(&root_key)?;
905 let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
906 #[cfg(with_metrics)]
907 metrics::READ_BLOB_STATE_COUNTER
908 .with_label_values(&[metrics::DB])
909 .inc();
910 Ok(blob_state)
911 }
912
913 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
914 async fn read_blob_states(
915 &self,
916 blob_ids: &[BlobId],
917 ) -> Result<Vec<Option<BlobState>>, ViewError> {
918 if blob_ids.is_empty() {
919 return Ok(Vec::new());
920 }
921 futures::future::try_join_all(
922 blob_ids
923 .iter()
924 .map(|blob_id| self.read_blob_state(*blob_id)),
925 )
926 .await
927 }
928
929 #[instrument(skip_all, fields(blob_id = %blob.id()))]
930 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
931 let mut batch = MultiPartitionBatch::new();
932 batch.add_blob(blob);
933 self.write_batch(batch).await?;
934 Ok(())
935 }
936
937 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
938 async fn maybe_write_blob_states(
939 &self,
940 blob_ids: &[BlobId],
941 blob_state: BlobState,
942 ) -> Result<(), ViewError> {
943 if blob_ids.is_empty() {
944 return Ok(());
945 }
946 let mut maybe_blob_states = Vec::new();
947 for blob_id in blob_ids {
948 let root_key = RootKey::BlobId(*blob_id).bytes();
949 let store = self.database.open_shared(&root_key)?;
950 let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
951 maybe_blob_states.push(maybe_blob_state);
952 }
953 let mut batch = MultiPartitionBatch::new();
954 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
955 match maybe_blob_state {
956 None => {
957 batch.add_blob_state(*blob_id, &blob_state)?;
958 }
959 Some(state) => {
960 if state.epoch < blob_state.epoch {
961 batch.add_blob_state(*blob_id, &blob_state)?;
962 }
963 }
964 }
965 }
966 self.write_batch(batch).await?;
970 Ok(())
971 }
972
973 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
974 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
975 if blobs.is_empty() {
976 return Ok(Vec::new());
977 }
978 let mut batch = MultiPartitionBatch::new();
979 let mut blob_states = Vec::new();
980 for blob in blobs {
981 let root_key = RootKey::BlobId(blob.id()).bytes();
982 let store = self.database.open_shared(&root_key)?;
983 let has_state = store.contains_key(BLOB_STATE_KEY).await?;
984 blob_states.push(has_state);
985 if has_state {
986 batch.add_blob(blob);
987 }
988 }
989 self.write_batch(batch).await?;
990 Ok(blob_states)
991 }
992
993 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
994 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
995 if blobs.is_empty() {
996 return Ok(());
997 }
998 let mut batch = MultiPartitionBatch::new();
999 for blob in blobs {
1000 batch.add_blob(blob);
1001 }
1002 self.write_batch(batch).await
1003 }
1004
1005 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1006 async fn write_blobs_and_certificate(
1007 &self,
1008 blobs: &[Blob],
1009 certificate: &ConfirmedBlockCertificate,
1010 ) -> Result<(), ViewError> {
1011 let mut batch = MultiPartitionBatch::new();
1012 for blob in blobs {
1013 batch.add_blob(blob);
1014 }
1015 batch.add_certificate(certificate)?;
1016 self.write_batch(batch).await
1017 }
1018
1019 fn cache_certificate(
1020 &self,
1021 certificate: ConfirmedBlockCertificate,
1022 ) -> Arc<ConfirmedBlockCertificate> {
1023 self.caches
1024 .certificate
1025 .insert(&certificate.hash(), certificate)
1026 }
1027
1028 fn cache_blob(&self, blob: Blob) -> Arc<Blob> {
1029 self.caches.blob.insert(&blob.id(), blob)
1030 }
1031
1032 fn cache_confirmed_block(&self, block: ConfirmedBlock) -> Arc<ConfirmedBlock> {
1033 self.caches.confirmed_block.insert(&block.hash(), block)
1034 }
1035
1036 #[instrument(skip_all, fields(%hash))]
1037 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
1038 if self.caches.certificate.contains(&hash) || self.caches.certificate_raw.contains(&hash) {
1039 #[cfg(with_metrics)]
1040 metrics::CONTAINS_CERTIFICATE_COUNTER
1041 .with_label_values(&[metrics::CACHE])
1042 .inc();
1043 return Ok(true);
1044 }
1045 let root_key = RootKey::BlockHash(hash).bytes();
1046 let store = self.database.open_shared(&root_key)?;
1047 let results = store.contains_keys(&get_block_keys()).await?;
1048 #[cfg(with_metrics)]
1049 metrics::CONTAINS_CERTIFICATE_COUNTER
1050 .with_label_values(&[metrics::DB])
1051 .inc();
1052 Ok(results[0] && results[1])
1053 }
1054
1055 #[instrument(skip_all, fields(%hash))]
1056 async fn read_certificate(
1057 &self,
1058 hash: CryptoHash,
1059 ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError> {
1060 if let Some(cert) = self.caches.certificate.get(&hash) {
1062 #[cfg(with_metrics)]
1063 metrics::READ_CERTIFICATE_COUNTER
1064 .with_label_values(&[metrics::CACHE])
1065 .inc();
1066 return Ok(Some(cert));
1067 }
1068 if let Some(raw) = self.caches.certificate_raw.get(&hash) {
1070 #[cfg(with_metrics)]
1071 metrics::READ_CERTIFICATE_COUNTER
1072 .with_label_values(&[metrics::CACHE])
1073 .inc();
1074 return self.deserialize_and_cache_certificate(&raw.0, &raw.1);
1075 }
1076 let root_key = RootKey::BlockHash(hash).bytes();
1078 let store = self.database.open_shared(&root_key)?;
1079 let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1080 #[cfg(with_metrics)]
1081 metrics::READ_CERTIFICATE_COUNTER
1082 .with_label_values(&[metrics::DB])
1083 .inc();
1084 let Some(lite_cert_bytes) = values[0].as_ref() else {
1085 return Ok(None);
1086 };
1087 let Some(confirmed_block_bytes) = values[1].as_ref() else {
1088 return Ok(None);
1089 };
1090 self.caches.certificate_raw.insert(
1091 &hash,
1092 (lite_cert_bytes.clone(), confirmed_block_bytes.clone()),
1093 );
1094 self.deserialize_and_cache_certificate(lite_cert_bytes, confirmed_block_bytes)
1095 }
1096
1097 #[instrument(skip_all)]
1098 async fn read_certificates(
1099 &self,
1100 hashes: &[CryptoHash],
1101 ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError> {
1102 let raw_certs = self.read_certificates_raw(hashes).await?;
1103
1104 raw_certs
1105 .into_iter()
1106 .map(|maybe_raw| {
1107 let Some(raw) = maybe_raw else {
1108 return Ok(None);
1109 };
1110 self.deserialize_and_cache_certificate(&raw.0, &raw.1)
1111 })
1112 .collect()
1113 }
1114
1115 #[instrument(skip_all)]
1116 async fn read_certificates_raw(
1117 &self,
1118 hashes: &[CryptoHash],
1119 ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1120 if hashes.is_empty() {
1121 return Ok(Vec::new());
1122 }
1123 let mut results = vec![None; hashes.len()];
1124 let mut misses = Vec::new();
1125 for (i, hash) in hashes.iter().enumerate() {
1126 if let Some(raw) = self.caches.certificate_raw.get(hash) {
1127 results[i] = Some(raw);
1128 } else {
1129 misses.push(i);
1130 }
1131 }
1132 if !misses.is_empty() {
1133 let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
1134 let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
1135 for (miss_idx, root_key) in misses.iter().zip(root_keys) {
1136 let store = self.database.open_shared(&root_key)?;
1137 let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1138 if let (Some(lite), Some(block)) = (values[0].as_ref(), values[1].as_ref()) {
1139 results[*miss_idx] = Some(
1140 self.caches
1141 .certificate_raw
1142 .insert(&hashes[*miss_idx], (lite.clone(), block.clone())),
1143 );
1144 }
1145 }
1146 }
1147 #[cfg(with_metrics)]
1148 {
1149 let cache_hits = (hashes.len() - misses.len()) as u64;
1150 if cache_hits > 0 {
1151 metrics::READ_CERTIFICATES_COUNTER
1152 .with_label_values(&[metrics::CACHE])
1153 .inc_by(cache_hits);
1154 }
1155 let db_reads = misses.len() as u64;
1156 if db_reads > 0 {
1157 metrics::READ_CERTIFICATES_COUNTER
1158 .with_label_values(&[metrics::DB])
1159 .inc_by(db_reads);
1160 }
1161 }
1162 Ok(results)
1163 }
1164
1165 async fn read_certificate_hashes_by_heights(
1166 &self,
1167 chain_id: ChainId,
1168 heights: &[BlockHeight],
1169 ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
1170 if heights.is_empty() {
1171 return Ok(Vec::new());
1172 }
1173
1174 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1175 let store = self.database.open_shared(&index_root_key)?;
1176 let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
1177 let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
1178 let hash_options: Vec<Option<CryptoHash>> = hash_bytes
1179 .into_iter()
1180 .map(|opt| {
1181 opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
1182 .transpose()
1183 })
1184 .collect::<Result<_, _>>()?;
1185
1186 Ok(hash_options)
1187 }
1188
1189 async fn read_event_block_heights(
1190 &self,
1191 event_ids: &[EventId],
1192 ) -> Result<Vec<Option<BlockHeight>>, ViewError> {
1193 if event_ids.is_empty() {
1194 return Ok(Vec::new());
1195 }
1196 let mut chain_groups = BTreeMap::<_, Vec<_>>::new();
1198 for (i, event_id) in event_ids.iter().enumerate() {
1199 chain_groups
1200 .entry(event_id.chain_id)
1201 .or_default()
1202 .push((i, to_event_key(event_id)));
1203 }
1204 let mut results = vec![None; event_ids.len()];
1205 for (chain_id, entries) in chain_groups {
1206 let root_key = RootKey::EventBlockHeight(chain_id).bytes();
1207 let store = self.database.open_shared(&root_key)?;
1208 let keys = entries
1209 .iter()
1210 .map(|(_, key)| key.clone())
1211 .collect::<Vec<_>>();
1212 let values = store.read_multi_values_bytes(&keys).await?;
1213 for ((original_index, _), value) in entries.into_iter().zip(values) {
1214 if let Some(bytes) = value {
1215 results[original_index] = Some(bcs::from_bytes::<BlockHeight>(&bytes)?);
1216 }
1217 }
1218 }
1219 Ok(results)
1220 }
1221
1222 #[instrument(skip_all)]
1223 async fn read_certificates_by_heights_raw(
1224 &self,
1225 chain_id: ChainId,
1226 heights: &[BlockHeight],
1227 ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1228 let hashes: Vec<Option<CryptoHash>> = self
1229 .read_certificate_hashes_by_heights(chain_id, heights)
1230 .await?;
1231
1232 let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
1234 for (index, maybe_hash) in hashes.iter().enumerate() {
1235 if let Some(hash) = maybe_hash {
1236 indices.entry(*hash).or_default().push(index);
1237 }
1238 }
1239
1240 let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
1242
1243 let mut result = vec![None; heights.len()];
1244
1245 for (raw_cert, hash) in self
1246 .read_certificates_raw(&unique_hashes)
1247 .await?
1248 .into_iter()
1249 .zip(unique_hashes)
1250 {
1251 if let Some(idx_list) = indices.get(&hash) {
1252 for &index in idx_list {
1253 result[index] = raw_cert.clone();
1254 }
1255 } else {
1256 tracing::error!(?hash, "certificate hash not found in indices map",);
1258 }
1259 }
1260
1261 Ok(result)
1262 }
1263
1264 #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
1265 async fn read_certificates_by_heights(
1266 &self,
1267 chain_id: ChainId,
1268 heights: &[BlockHeight],
1269 ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError> {
1270 self.read_certificates_by_heights_raw(chain_id, heights)
1271 .await?
1272 .into_iter()
1273 .map(|maybe_raw| match maybe_raw {
1274 None => Ok(None),
1275 Some(raw) => self.deserialize_and_cache_certificate(&raw.0, &raw.1),
1276 })
1277 .collect()
1278 }
1279
1280 #[instrument(skip_all, fields(event_id = ?event_id))]
1281 async fn read_event(&self, event_id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError> {
1282 if let Some(event) = self.caches.event.get(&event_id) {
1283 #[cfg(with_metrics)]
1284 metrics::READ_EVENT_COUNTER
1285 .with_label_values(&[metrics::CACHE])
1286 .inc();
1287 return Ok(Some(event));
1288 }
1289 let event_key = to_event_key(&event_id);
1290 let root_key = RootKey::Event(event_id.chain_id).bytes();
1291 let store = self.database.open_shared(&root_key)?;
1292 let event = store.read_value_bytes(&event_key).await?;
1293 #[cfg(with_metrics)]
1294 metrics::READ_EVENT_COUNTER
1295 .with_label_values(&[metrics::DB])
1296 .inc();
1297 match event {
1298 Some(event_bytes) => Ok(Some(self.caches.event.insert(&event_id, event_bytes))),
1299 None => Ok(None),
1300 }
1301 }
1302
1303 #[instrument(skip_all, fields(event_id = ?event_id))]
1304 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1305 if self.caches.event.contains(&event_id) {
1306 #[cfg(with_metrics)]
1307 metrics::CONTAINS_EVENT_COUNTER
1308 .with_label_values(&[metrics::CACHE])
1309 .inc();
1310 return Ok(true);
1311 }
1312 let event_key = to_event_key(&event_id);
1313 let root_key = RootKey::Event(event_id.chain_id).bytes();
1314 let store = self.database.open_shared(&root_key)?;
1315 let exists = store.contains_key(&event_key).await?;
1316 #[cfg(with_metrics)]
1317 metrics::CONTAINS_EVENT_COUNTER
1318 .with_label_values(&[metrics::DB])
1319 .inc();
1320 Ok(exists)
1321 }
1322
1323 #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
1324 async fn read_events_from_index(
1325 &self,
1326 chain_id: &ChainId,
1327 stream_id: &StreamId,
1328 start_index: u32,
1329 ) -> Result<Vec<IndexAndEvent>, ViewError> {
1330 let root_key = RootKey::Event(*chain_id).bytes();
1331 let store = self.database.open_shared(&root_key)?;
1332 let mut keys = Vec::new();
1333 let mut indices = Vec::new();
1334 let prefix = bcs::to_bytes(stream_id).unwrap();
1335 for short_key in store.find_keys_by_prefix(&prefix).await? {
1336 let index = bcs::from_bytes::<u32>(&short_key)?;
1337 if index >= start_index {
1338 let mut key = prefix.clone();
1339 key.extend(short_key);
1340 keys.push(key);
1341 indices.push(index);
1342 }
1343 }
1344 let values = store.read_multi_values_bytes(&keys).await?;
1345 let mut returned_values = Vec::new();
1346 for (index, value) in indices.into_iter().zip(values) {
1347 let event = value.unwrap();
1348 returned_values.push(IndexAndEvent { index, event });
1349 }
1350 Ok(returned_values)
1351 }
1352
1353 #[instrument(skip_all)]
1354 async fn write_events(
1355 &self,
1356 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1357 ) -> Result<(), ViewError> {
1358 let mut batch = MultiPartitionBatch::new();
1359 for (event_id, value) in events {
1360 batch.add_event(&event_id, value);
1361 }
1362 self.write_batch(batch).await
1363 }
1364
1365 #[instrument(skip_all)]
1366 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1367 let root_key = RootKey::NetworkDescription.bytes();
1368 let store = self.database.open_shared(&root_key)?;
1369 let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1370 #[cfg(with_metrics)]
1371 metrics::READ_NETWORK_DESCRIPTION
1372 .with_label_values(&[metrics::DB])
1373 .inc();
1374 Ok(maybe_value)
1375 }
1376
1377 #[instrument(skip_all)]
1378 async fn write_network_description(
1379 &self,
1380 information: &NetworkDescription,
1381 ) -> Result<(), ViewError> {
1382 let mut batch = MultiPartitionBatch::new();
1383 batch.add_network_description(information)?;
1384 self.write_batch(batch).await?;
1385 Ok(())
1386 }
1387
1388 fn wasm_runtime(&self) -> Option<WasmRuntime> {
1389 self.wasm_runtime
1390 }
1391
1392 #[instrument(skip_all)]
1393 async fn block_exporter_context(
1394 &self,
1395 block_exporter_id: u32,
1396 ) -> Result<Self::BlockExporterContext, ViewError> {
1397 let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1398 let store = self.database.open_exclusive(&root_key)?;
1399 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1400 }
1401
1402 async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError> {
1403 let root_keys = self.database.list_root_keys().await?;
1404 let mut blob_ids = Vec::new();
1405 for root_key in root_keys {
1406 if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1407 let root_key_red = &root_key[1..];
1408 let blob_id = bcs::from_bytes(root_key_red)?;
1409 blob_ids.push(blob_id);
1410 }
1411 }
1412 Ok(blob_ids)
1413 }
1414
1415 async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError> {
1416 let root_keys = self.database.list_root_keys().await?;
1417 let mut chain_ids = Vec::new();
1418 for root_key in root_keys {
1419 if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1420 let root_key_red = &root_key[1..];
1421 let chain_id = bcs::from_bytes(root_key_red)?;
1422 chain_ids.push(chain_id);
1423 }
1424 }
1425 Ok(chain_ids)
1426 }
1427
1428 async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError> {
1429 let root_keys = self.database.list_root_keys().await?;
1430 let mut event_ids = Vec::new();
1431 for root_key in root_keys {
1432 if !root_key.is_empty() && root_key[0] == EVENT_ID_TAG {
1433 let root_key_red = &root_key[1..];
1434 let chain_id = bcs::from_bytes(root_key_red)?;
1435 let store = self.database.open_shared(&root_key)?;
1436 let keys = store.find_keys_by_prefix(&[]).await?;
1437 for key in keys {
1438 let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
1439 let event_id = EventId {
1440 chain_id,
1441 stream_id: restricted_event_id.stream_id,
1442 index: restricted_event_id.index,
1443 };
1444 event_ids.push(event_id);
1445 }
1446 }
1447 }
1448 Ok(event_ids)
1449 }
1450}
1451
1452impl<Database, C> DbStorage<Database, C>
1453where
1454 Database: KeyValueDatabase + Clone,
1455 Database::Store: KeyValueStore + Clone,
1456 C: Clock,
1457 Database::Error: Send + Sync,
1458{
1459 #[instrument(skip_all)]
1460 fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1461 hashes
1462 .iter()
1463 .map(|hash| RootKey::BlockHash(*hash).bytes())
1464 .collect()
1465 }
1466
1467 fn deserialize_and_cache_certificate(
1468 &self,
1469 lite_cert_bytes: &[u8],
1470 confirmed_block_bytes: &[u8],
1471 ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError> {
1472 let lite = bcs::from_bytes::<LiteCertificate>(lite_cert_bytes)?;
1473 let block = bcs::from_bytes::<ConfirmedBlock>(confirmed_block_bytes)?;
1474 let hash = block.hash();
1475 self.caches.confirmed_block.insert(&hash, block.clone());
1476 let certificate = lite
1477 .with_value(block)
1478 .ok_or(ViewError::InconsistentEntries)?;
1479 let arc = self.caches.certificate.insert(&hash, certificate);
1480 Ok(Some(arc))
1481 }
1482
1483 #[instrument(skip_all)]
1484 async fn write_entry(
1485 store: &Database::Store,
1486 key_values: Vec<(Vec<u8>, Vec<u8>)>,
1487 ) -> Result<(), ViewError> {
1488 let mut batch = Batch::new();
1489 for (key, value) in key_values {
1490 batch.put_key_value_bytes(key, value);
1491 }
1492 store.write_batch(batch).await?;
1493 Ok(())
1494 }
1495
1496 #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1497 async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1498 if batch.keys_value_bytes.is_empty() {
1499 return Ok(());
1500 }
1501 let mut futures = Vec::new();
1502 for (root_key, key_values) in batch.keys_value_bytes {
1503 let store = self.database.open_shared(&root_key)?;
1504 futures.push(async move { Self::write_entry(&store, key_values).await });
1505 }
1506 futures::future::try_join_all(futures).await?;
1507 Ok(())
1508 }
1509}
1510
1511impl<Database, C> DbStorage<Database, C> {
1512 fn new(
1513 database: Database,
1514 wasm_runtime: Option<WasmRuntime>,
1515 cache_sizes: StorageCacheConfig,
1516 clock: C,
1517 ) -> Self {
1518 Self {
1519 database: Arc::new(database),
1520 clock,
1521 #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1523 thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1524 wasm_runtime,
1525 user_contracts: Arc::new(papaya::HashMap::new()),
1526 user_services: Arc::new(papaya::HashMap::new()),
1527 shared_committees: SharedCommittees::new(),
1528 caches: StorageCaches::new(cache_sizes),
1529 execution_runtime_config: ExecutionRuntimeConfig::default(),
1530 }
1531 }
1532
1533 pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1535 self.execution_runtime_config.allow_application_logs = allow;
1536 self
1537 }
1538}
1539
1540impl<Database> DbStorage<Database, WallClock>
1541where
1542 Database: KeyValueDatabase + Clone + 'static,
1543 Database::Error: Send + Sync,
1544 Database::Store: KeyValueStore + Clone + 'static,
1545{
1546 pub async fn maybe_create_and_connect(
1547 config: &Database::Config,
1548 namespace: &str,
1549 wasm_runtime: Option<WasmRuntime>,
1550 cache_sizes: StorageCacheConfig,
1551 ) -> Result<Self, Database::Error> {
1552 let database = Database::maybe_create_and_connect(config, namespace).await?;
1553 Ok(Self::new(database, wasm_runtime, cache_sizes, WallClock))
1554 }
1555
1556 pub async fn connect(
1557 config: &Database::Config,
1558 namespace: &str,
1559 wasm_runtime: Option<WasmRuntime>,
1560 cache_sizes: StorageCacheConfig,
1561 ) -> Result<Self, Database::Error> {
1562 let database = Database::connect(config, namespace).await?;
1563 Ok(Self::new(database, wasm_runtime, cache_sizes, WallClock))
1564 }
1565}
1566
1567#[cfg(with_testing)]
1568impl<Database> DbStorage<Database, TestClock>
1569where
1570 Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1571 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1572 Database::Error: Send + Sync,
1573{
1574 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1575 let config = Database::new_test_config().await.unwrap();
1576 let namespace = generate_test_namespace();
1577 DbStorage::<Database, TestClock>::new_for_testing(
1578 config,
1579 &namespace,
1580 wasm_runtime,
1581 TestClock::new(),
1582 )
1583 .await
1584 .unwrap()
1585 }
1586
1587 pub async fn new_for_testing(
1588 config: Database::Config,
1589 namespace: &str,
1590 wasm_runtime: Option<WasmRuntime>,
1591 clock: TestClock,
1592 ) -> Result<Self, Database::Error> {
1593 let database = Database::recreate_and_connect(&config, namespace).await?;
1594 Ok(Self::new(
1595 database,
1596 wasm_runtime,
1597 DEFAULT_STORAGE_CACHE_CONFIG,
1598 clock,
1599 ))
1600 }
1601}
1602
1603#[cfg(test)]
1604mod tests {
1605 use linera_base::{
1606 crypto::{CryptoHash, TestString},
1607 data_types::{BlockHeight, Epoch, Round, Timestamp},
1608 identifiers::{
1609 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
1610 StreamName,
1611 },
1612 };
1613 use linera_chain::{
1614 block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
1615 types::ConfirmedBlockCertificate,
1616 };
1617 use linera_views::{
1618 memory::MemoryDatabase,
1619 store::{KeyValueDatabase, ReadableKeyValueStore as _},
1620 };
1621
1622 use crate::{
1623 db_storage::{
1624 to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
1625 EVENT_ID_TAG,
1626 },
1627 DbStorage, Storage, TestClock,
1628 };
1629
1630 #[test]
1637 fn test_root_key_blob_serialization() {
1638 let hash = CryptoHash::default();
1639 let blob_type = BlobType::default();
1640 let blob_id = BlobId::new(hash, blob_type);
1641 let root_key = RootKey::BlobId(blob_id).bytes();
1642 assert_eq!(root_key[0], BLOB_ID_TAG);
1643 assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
1644 }
1645
1646 #[test]
1649 fn test_root_key_chainstate_serialization() {
1650 let hash = CryptoHash::default();
1651 let chain_id = ChainId(hash);
1652 let root_key = RootKey::ChainState(chain_id).bytes();
1653 assert_eq!(root_key[0], CHAIN_ID_TAG);
1654 assert_eq!(
1655 bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
1656 chain_id
1657 );
1658 }
1659
1660 #[test]
1663 fn test_root_key_event_serialization() {
1664 let hash = CryptoHash::test_hash("49");
1665 let chain_id = ChainId(hash);
1666 let application_description_hash = CryptoHash::test_hash("42");
1667 let application_id = ApplicationId::new(application_description_hash);
1668 let application_id = GenericApplicationId::User(application_id);
1669 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
1670 let stream_id = StreamId {
1671 application_id,
1672 stream_name,
1673 };
1674 let prefix = bcs::to_bytes(&stream_id).unwrap();
1675
1676 let index = 1567;
1677 let event_id = EventId {
1678 chain_id,
1679 stream_id,
1680 index,
1681 };
1682 let root_key = RootKey::Event(chain_id).bytes();
1683 assert_eq!(root_key[0], EVENT_ID_TAG);
1684 let key = to_event_key(&event_id);
1685 assert!(key.starts_with(&prefix));
1686 }
1687
1688 #[test]
1691 fn test_root_key_block_by_height_serialization() {
1692 use linera_base::data_types::BlockHeight;
1693
1694 let hash = CryptoHash::default();
1695 let chain_id = ChainId(hash);
1696 let height = BlockHeight(42);
1697
1698 let root_key = RootKey::BlockByHeight(chain_id).bytes();
1700 let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
1701 assert_eq!(deserialized_chain_id, chain_id);
1702
1703 let height_key = to_height_key(height);
1705 let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
1706 assert_eq!(deserialized_height, height);
1707 }
1708
1709 #[cfg(with_testing)]
1710 #[tokio::test]
1711 async fn test_add_certificate_creates_height_index() {
1712 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1714
1715 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1717 let height = BlockHeight(5);
1718 let block = Block {
1719 header: BlockHeader {
1720 chain_id,
1721 epoch: Epoch::ZERO,
1722 height,
1723 timestamp: Timestamp::from(0),
1724 state_hash: CryptoHash::new(&TestString::new("state_hash")),
1725 previous_block_hash: None,
1726 authenticated_owner: None,
1727 transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
1728 messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
1729 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1730 "prev_msg_blocks_hash",
1731 )),
1732 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1733 "prev_event_blocks_hash",
1734 )),
1735 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
1736 events_hash: CryptoHash::new(&TestString::new("events_hash")),
1737 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1738 operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
1739 },
1740 body: BlockBody {
1741 transactions: vec![],
1742 messages: vec![],
1743 previous_message_blocks: Default::default(),
1744 previous_event_blocks: Default::default(),
1745 oracle_responses: vec![],
1746 events: vec![],
1747 blobs: vec![],
1748 operation_results: vec![],
1749 },
1750 };
1751 let confirmed_block = ConfirmedBlock::new(block);
1752 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1753
1754 let mut batch = MultiPartitionBatch::new();
1756 batch.add_certificate(&certificate).unwrap();
1757 storage.write_batch(batch).await.unwrap();
1758
1759 let hash = certificate.hash();
1761 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1762 let store = storage.database.open_shared(&index_root_key).unwrap();
1763 let height_key = to_height_key(height);
1764 let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
1765
1766 assert!(value_bytes.is_some(), "Height index was not created");
1767 let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
1768 assert_eq!(stored_hash, hash, "Height index contains wrong hash");
1769 }
1770
1771 #[cfg(with_testing)]
1772 #[tokio::test]
1773 async fn test_read_certificates_by_heights() {
1774 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1775 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1776
1777 let mut batch = MultiPartitionBatch::new();
1779 let mut expected_certs = vec![];
1780
1781 for height in [1, 3, 5] {
1782 let block = Block {
1783 header: BlockHeader {
1784 chain_id,
1785 epoch: Epoch::ZERO,
1786 height: BlockHeight(height),
1787 timestamp: Timestamp::from(0),
1788 state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
1789 previous_block_hash: None,
1790 authenticated_owner: None,
1791 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
1792 messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
1793 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1794 "pmb_hash_{height}",
1795 )),
1796 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1797 "peb_hash_{height}",
1798 )),
1799 oracle_responses_hash: CryptoHash::new(&TestString::new(
1800 "oracle_hash_{height}",
1801 )),
1802 events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
1803 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
1804 operation_results_hash: CryptoHash::new(&TestString::new(
1805 "op_results_hash_{height}",
1806 )),
1807 },
1808 body: BlockBody {
1809 transactions: vec![],
1810 messages: vec![],
1811 previous_message_blocks: Default::default(),
1812 previous_event_blocks: Default::default(),
1813 oracle_responses: vec![],
1814 events: vec![],
1815 blobs: vec![],
1816 operation_results: vec![],
1817 },
1818 };
1819 let confirmed_block = ConfirmedBlock::new(block);
1820 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1821 expected_certs.push((height, cert.clone()));
1822 batch.add_certificate(&cert).unwrap();
1823 }
1824 storage.write_batch(batch).await.unwrap();
1825
1826 let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
1828 let result = storage
1829 .read_certificates_by_heights(chain_id, &heights)
1830 .await
1831 .unwrap();
1832 assert_eq!(result.len(), 3);
1833 assert_eq!(
1834 result[0].as_ref().unwrap().hash(),
1835 expected_certs[0].1.hash()
1836 );
1837 assert_eq!(
1838 result[1].as_ref().unwrap().hash(),
1839 expected_certs[1].1.hash()
1840 );
1841 assert_eq!(
1842 result[2].as_ref().unwrap().hash(),
1843 expected_certs[2].1.hash()
1844 );
1845
1846 let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
1848 let result = storage
1849 .read_certificates_by_heights(chain_id, &heights)
1850 .await
1851 .unwrap();
1852 assert_eq!(result.len(), 3);
1853 assert_eq!(
1854 result[0].as_ref().unwrap().hash(),
1855 expected_certs[2].1.hash()
1856 );
1857 assert_eq!(
1858 result[1].as_ref().unwrap().hash(),
1859 expected_certs[0].1.hash()
1860 );
1861 assert_eq!(
1862 result[2].as_ref().unwrap().hash(),
1863 expected_certs[1].1.hash()
1864 );
1865
1866 let heights = vec![
1868 BlockHeight(1),
1869 BlockHeight(2),
1870 BlockHeight(3),
1871 BlockHeight(3),
1872 ];
1873 let result = storage
1874 .read_certificates_by_heights(chain_id, &heights)
1875 .await
1876 .unwrap();
1877 assert_eq!(result.len(), 4); assert!(result[0].is_some());
1879 assert!(result[1].is_none()); assert!(result[2].is_some());
1881 assert!(result[3].is_some());
1882 assert_eq!(
1883 result[2].as_ref().unwrap().hash(),
1884 result[3].as_ref().unwrap().hash()
1885 ); let heights = vec![];
1889 let result = storage
1890 .read_certificates_by_heights(chain_id, &heights)
1891 .await
1892 .unwrap();
1893 assert_eq!(result.len(), 0);
1894 }
1895
1896 #[cfg(with_testing)]
1897 #[tokio::test]
1898 async fn test_read_certificates_by_heights_multiple_chains() {
1899 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1900
1901 let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
1903 let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
1904
1905 let mut batch = MultiPartitionBatch::new();
1906
1907 let block_a = Block {
1908 header: BlockHeader {
1909 chain_id: chain_a,
1910 epoch: Epoch::ZERO,
1911 height: BlockHeight(10),
1912 timestamp: Timestamp::from(0),
1913 state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
1914 previous_block_hash: None,
1915 authenticated_owner: None,
1916 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
1917 messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
1918 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
1919 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
1920 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
1921 events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
1922 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
1923 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
1924 },
1925 body: BlockBody {
1926 transactions: vec![],
1927 messages: vec![],
1928 previous_message_blocks: Default::default(),
1929 previous_event_blocks: Default::default(),
1930 oracle_responses: vec![],
1931 events: vec![],
1932 blobs: vec![],
1933 operation_results: vec![],
1934 },
1935 };
1936 let confirmed_block_a = ConfirmedBlock::new(block_a);
1937 let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
1938 batch.add_certificate(&cert_a).unwrap();
1939
1940 let block_b = Block {
1941 header: BlockHeader {
1942 chain_id: chain_b,
1943 epoch: Epoch::ZERO,
1944 height: BlockHeight(10),
1945 timestamp: Timestamp::from(0),
1946 state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
1947 previous_block_hash: None,
1948 authenticated_owner: None,
1949 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
1950 messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
1951 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
1952 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
1953 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
1954 events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
1955 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
1956 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
1957 },
1958 body: BlockBody {
1959 transactions: vec![],
1960 messages: vec![],
1961 previous_message_blocks: Default::default(),
1962 previous_event_blocks: Default::default(),
1963 oracle_responses: vec![],
1964 events: vec![],
1965 blobs: vec![],
1966 operation_results: vec![],
1967 },
1968 };
1969 let confirmed_block_b = ConfirmedBlock::new(block_b);
1970 let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
1971 batch.add_certificate(&cert_b).unwrap();
1972
1973 storage.write_batch(batch).await.unwrap();
1974
1975 let result = storage
1977 .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
1978 .await
1979 .unwrap();
1980 assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
1981
1982 let result = storage
1984 .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
1985 .await
1986 .unwrap();
1987 assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
1988
1989 let result = storage
1991 .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
1992 .await
1993 .unwrap();
1994 assert!(result[0].is_none());
1995 }
1996
1997 #[cfg(with_testing)]
1998 #[tokio::test]
1999 async fn test_read_certificates_by_heights_consistency() {
2000 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
2001 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
2002
2003 let mut batch = MultiPartitionBatch::new();
2005 let block = Block {
2006 header: BlockHeader {
2007 chain_id,
2008 epoch: Epoch::ZERO,
2009 height: BlockHeight(7),
2010 timestamp: Timestamp::from(0),
2011 state_hash: CryptoHash::new(&TestString::new("state_hash")),
2012 previous_block_hash: None,
2013 authenticated_owner: None,
2014 transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
2015 messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
2016 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
2017 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
2018 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
2019 events_hash: CryptoHash::new(&TestString::new("events_hash")),
2020 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
2021 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
2022 },
2023 body: BlockBody {
2024 transactions: vec![],
2025 messages: vec![],
2026 previous_message_blocks: Default::default(),
2027 previous_event_blocks: Default::default(),
2028 oracle_responses: vec![],
2029 events: vec![],
2030 blobs: vec![],
2031 operation_results: vec![],
2032 },
2033 };
2034 let confirmed_block = ConfirmedBlock::new(block);
2035 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
2036 let hash = cert.hash();
2037 batch.add_certificate(&cert).unwrap();
2038 storage.write_batch(batch).await.unwrap();
2039
2040 let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
2042
2043 let certs_by_height = storage
2045 .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
2046 .await
2047 .unwrap();
2048 let cert_by_height = certs_by_height[0].as_ref().unwrap();
2049
2050 assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
2052 assert_eq!(
2053 cert_by_hash.value().block().header,
2054 cert_by_height.value().block().header
2055 );
2056 }
2057}