1use std::{
5 collections::{BTreeMap, HashMap},
6 fmt::Debug,
7 sync::{Arc, OnceLock},
8};
9
10use async_trait::async_trait;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14 crypto::CryptoHash,
15 data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
16 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
17};
18use linera_cache::{Arc as CacheArc, ValueCache};
19use linera_chain::{
20 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
21 ChainStateView,
22};
23use linera_execution::{
24 BlobState, ExecutionRuntimeConfig, SharedCommittees, UserContractCode, UserServiceCode,
25 WasmRuntime,
26};
27use linera_views::{
28 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
29 batch::Batch,
30 context::ViewContext,
31 store::{
32 KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
33 },
34 views::View,
35 ViewError,
36};
37use serde::{Deserialize, Serialize};
38use tracing::{debug, instrument};
39#[cfg(with_testing)]
40use {
41 futures::channel::oneshot::{self, Receiver},
42 linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
43 std::cmp::Reverse,
44};
45
46use crate::{ChainRuntimeContext, Clock, Storage};
47
48#[cfg(with_metrics)]
50pub mod metrics {
51 use std::sync::LazyLock;
52
53 use linera_base::prometheus_util::{
54 exponential_bucket_interval, exponential_bucket_latencies, linear_bucket_interval,
55 register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
56 };
57 use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
58
59 pub(super) const SOURCE_LABEL: &str = "source";
61 pub(super) const CACHE: &str = "cache";
63 pub(super) const DB: &str = "db";
65
66 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
68 register_int_counter_vec(
69 "contains_blob",
70 "The metric counting how often a blob is tested for existence from storage",
71 &[SOURCE_LABEL],
72 )
73 });
74
75 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
77 register_int_counter_vec(
78 "contains_blobs",
79 "The metric counting how often multiple blobs are tested for existence from storage",
80 &[SOURCE_LABEL],
81 )
82 });
83
84 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
86 register_int_counter_vec(
87 "contains_blob_state",
88 "The metric counting how often a blob state is tested for existence from storage",
89 &[SOURCE_LABEL],
90 )
91 });
92
93 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
95 register_int_counter_vec(
96 "contains_certificate",
97 "The metric counting how often a certificate is tested for existence from storage",
98 &[SOURCE_LABEL],
99 )
100 });
101
102 #[doc(hidden)]
104 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
105 register_int_counter_vec(
106 "read_confirmed_block",
107 "The metric counting how often a hashed confirmed block is read from storage",
108 &[SOURCE_LABEL],
109 )
110 });
111
112 #[doc(hidden)]
114 pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
115 LazyLock::new(|| {
116 register_int_counter_vec(
117 "read_confirmed_blocks",
118 "The metric counting how often confirmed blocks are read from storage",
119 &[SOURCE_LABEL],
120 )
121 });
122
123 #[doc(hidden)]
125 pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
126 register_int_counter_vec(
127 "read_blob",
128 "The metric counting how often a blob is read from storage",
129 &[SOURCE_LABEL],
130 )
131 });
132
133 #[doc(hidden)]
135 pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
136 register_int_counter_vec(
137 "read_blob_state",
138 "The metric counting how often a blob state is read from storage",
139 &[SOURCE_LABEL],
140 )
141 });
142
143 #[doc(hidden)]
145 pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
146 register_int_counter(
147 "write_blob",
148 "The metric counting how often a blob is written to storage",
149 )
150 });
151
152 #[doc(hidden)]
154 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
155 register_int_counter_vec(
156 "read_certificate",
157 "The metric counting how often a certificate is read from storage",
158 &[SOURCE_LABEL],
159 )
160 });
161
162 #[doc(hidden)]
164 pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
165 register_int_counter_vec(
166 "read_certificates",
167 "The metric counting how often certificate are read from storage",
168 &[SOURCE_LABEL],
169 )
170 });
171
172 #[doc(hidden)]
174 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
175 register_int_counter(
176 "write_certificate",
177 "The metric counting how often a certificate is written to storage",
178 )
179 });
180
181 pub(super) static CERTIFICATE_LITE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
187 register_histogram(
188 "certificate_lite_bytes",
189 "Serialized size of the lite-certificate (signatures + metadata) in bytes",
190 exponential_bucket_interval(128.0, 2_097_152.0),
191 )
192 });
193
194 pub(super) static CERTIFICATE_VALUE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
198 register_histogram(
199 "certificate_value_bytes",
200 "Serialized size of the certificate value (block payload) in bytes",
201 exponential_bucket_interval(256.0, 16_777_216.0),
202 )
203 });
204
205 pub(super) static CERTIFICATE_SIGNER_COUNT: LazyLock<Histogram> = LazyLock::new(|| {
209 register_histogram(
210 "certificate_signer_count",
211 "Number of validator signatures attached to each confirmed certificate",
212 linear_bucket_interval(1.0, 1.0, 20.0),
213 )
214 });
215
216 #[doc(hidden)]
218 pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
219 register_histogram_vec(
220 "load_chain_latency",
221 "The latency to load a chain state",
222 &[],
223 exponential_bucket_latencies(1000.0),
224 )
225 });
226
227 #[doc(hidden)]
229 pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
230 register_int_counter_vec(
231 "read_event",
232 "The metric counting how often an event is read from storage",
233 &[SOURCE_LABEL],
234 )
235 });
236
237 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
239 register_int_counter_vec(
240 "contains_event",
241 "The metric counting how often an event is tested for existence from storage",
242 &[SOURCE_LABEL],
243 )
244 });
245
246 #[doc(hidden)]
248 pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
249 register_int_counter(
250 "write_event",
251 "The metric counting how often an event is written to storage",
252 )
253 });
254
255 #[doc(hidden)]
257 pub(super) static READ_BLOCK_HASH_BY_HEIGHT_COUNTER: LazyLock<IntCounterVec> =
258 LazyLock::new(|| {
259 register_int_counter_vec(
260 "read_block_hash_by_height",
261 "The metric counting how often a block hash is read by height from storage",
262 &[SOURCE_LABEL],
263 )
264 });
265
266 #[doc(hidden)]
268 pub(super) static READ_EVENT_BLOCK_HEIGHT_COUNTER: LazyLock<IntCounterVec> =
269 LazyLock::new(|| {
270 register_int_counter_vec(
271 "read_event_block_height",
272 "The metric counting how often an event block height is read from storage",
273 &[SOURCE_LABEL],
274 )
275 });
276
277 #[doc(hidden)]
279 pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
280 register_int_counter_vec(
281 "network_description",
282 "The metric counting how often the network description is read from storage",
283 &[SOURCE_LABEL],
284 )
285 });
286
287 #[doc(hidden)]
289 pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounter> = LazyLock::new(|| {
290 register_int_counter(
291 "write_network_description",
292 "The metric counting how often the network description is written to storage",
293 )
294 });
295}
296
297const BLOB_KEY: &[u8] = &[0];
299
300const BLOB_STATE_KEY: &[u8] = &[1];
302
303const LITE_CERTIFICATE_KEY: &[u8] = &[2];
305
306const BLOCK_KEY: &[u8] = &[3];
308
309const NETWORK_DESCRIPTION_KEY: &[u8] = &[4];
311
312fn get_block_keys() -> Vec<Vec<u8>> {
313 vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
314}
315
316#[derive(Default)]
317#[expect(clippy::type_complexity)]
318struct MultiPartitionBatch {
319 keys_value_bytes: BTreeMap<Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>>,
320}
321
322impl MultiPartitionBatch {
323 fn new() -> Self {
324 Self::default()
325 }
326
327 fn put_key_values(&mut self, root_key: Vec<u8>, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
328 let entry = self.keys_value_bytes.entry(root_key).or_default();
329 entry.extend(key_values);
330 }
331
332 fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
333 self.put_key_values(root_key, vec![(key, value)]);
334 }
335
336 fn add_blob(&mut self, blob: &Blob) {
337 #[cfg(with_metrics)]
338 metrics::WRITE_BLOB_COUNTER.inc();
339 let root_key = RootKey::BlobId(blob.id()).bytes();
340 let key = BLOB_KEY.to_vec();
341 self.put_key_value(root_key, key, blob.bytes().to_vec());
342 }
343
344 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
345 let root_key = RootKey::BlobId(blob_id).bytes();
346 let key = BLOB_STATE_KEY.to_vec();
347 let value = bcs::to_bytes(blob_state)?;
348 self.put_key_value(root_key, key, value);
349 Ok(())
350 }
351
352 fn add_certificate(
361 &mut self,
362 certificate: &ConfirmedBlockCertificate,
363 ) -> Result<(), ViewError> {
364 #[cfg(with_metrics)]
365 {
366 metrics::WRITE_CERTIFICATE_COUNTER.inc();
367 metrics::CERTIFICATE_SIGNER_COUNT.observe(certificate.signatures().len() as f64);
368 }
369 let hash = certificate.hash();
370
371 let root_key = RootKey::BlockHash(hash).bytes();
373 let mut key_values = Vec::new();
374 let key = LITE_CERTIFICATE_KEY.to_vec();
375 let value = bcs::to_bytes(&certificate.lite_certificate())?;
376 #[cfg(with_metrics)]
377 metrics::CERTIFICATE_LITE_BYTES.observe(value.len() as f64);
378 key_values.push((key, value));
379 let key = BLOCK_KEY.to_vec();
380 let value = bcs::to_bytes(&certificate.value())?;
381 #[cfg(with_metrics)]
382 metrics::CERTIFICATE_VALUE_BYTES.observe(value.len() as f64);
383 key_values.push((key, value));
384 self.put_key_values(root_key, key_values);
385
386 let chain_id = certificate.value().block().header.chain_id;
388 let height = certificate.value().block().header.height;
389 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
390 let height_key = to_height_key(height);
391 let index_value = bcs::to_bytes(&hash)?;
392 self.put_key_value(index_root_key, height_key, index_value);
393
394 let event_index_root_key = RootKey::EventBlockHeight(chain_id).bytes();
396 let height_value = bcs::to_bytes(&height)?;
397 for event in certificate.value().block().body.events.iter().flatten() {
398 let event_key = to_event_key(&EventId {
399 chain_id,
400 stream_id: event.stream_id.clone(),
401 index: event.index,
402 });
403 self.put_key_value(
404 event_index_root_key.clone(),
405 event_key,
406 height_value.clone(),
407 );
408 }
409
410 Ok(())
411 }
412
413 fn add_event(&mut self, event_id: &EventId, value: Vec<u8>) {
414 #[cfg(with_metrics)]
415 metrics::WRITE_EVENT_COUNTER.inc();
416 let key = to_event_key(event_id);
417 let root_key = RootKey::Event(event_id.chain_id).bytes();
418 self.put_key_value(root_key, key, value);
419 }
420
421 fn add_network_description(
422 &mut self,
423 information: &NetworkDescription,
424 ) -> Result<(), ViewError> {
425 #[cfg(with_metrics)]
426 metrics::WRITE_NETWORK_DESCRIPTION.inc();
427 let root_key = RootKey::NetworkDescription.bytes();
428 let key = NETWORK_DESCRIPTION_KEY.to_vec();
429 let value = bcs::to_bytes(information)?;
430 self.put_key_value(root_key, key, value);
431 Ok(())
432 }
433}
434
435#[derive(Clone, Copy, Debug)]
437pub struct StorageCacheConfig {
438 pub blob_cache_size: usize,
440 pub confirmed_block_cache_size: usize,
442 pub certificate_cache_size: usize,
444 pub certificate_raw_cache_size: usize,
446 pub event_cache_size: usize,
448 pub block_hash_by_height_cache_size: usize,
450 pub event_block_height_cache_size: usize,
452 pub cache_cleanup_interval_secs: u64,
454}
455
456#[cfg(with_testing)]
458pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
459 blob_cache_size: 1000,
460 confirmed_block_cache_size: 1000,
461 certificate_cache_size: 1000,
462 certificate_raw_cache_size: 1000,
463 event_cache_size: 1000,
464 block_hash_by_height_cache_size: 1000,
465 event_block_height_cache_size: 1000,
466 cache_cleanup_interval_secs: linera_cache::DEFAULT_CLEANUP_INTERVAL_SECS,
467};
468
469type RawCertificate = (Vec<u8>, Vec<u8>);
471
472#[derive(Clone)]
478pub struct StorageCaches {
479 pub(crate) blob: Arc<ValueCache<BlobId, Blob>>,
480 pub(crate) confirmed_block: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
481 pub(crate) certificate: Arc<ValueCache<CryptoHash, ConfirmedBlockCertificate>>,
482 pub(crate) certificate_raw: Arc<ValueCache<CryptoHash, RawCertificate>>,
483 pub(crate) event: Arc<ValueCache<EventId, Vec<u8>>>,
484 pub(crate) block_hash_by_height: Arc<ValueCache<(ChainId, BlockHeight), CryptoHash>>,
485 pub(crate) event_block_height: Arc<ValueCache<EventId, BlockHeight>>,
486 pub(crate) network_description: Arc<OnceLock<NetworkDescription>>,
487}
488
489impl StorageCaches {
490 pub fn new(sizes: StorageCacheConfig) -> Self {
492 let interval = sizes.cache_cleanup_interval_secs;
493 Self {
494 blob: Arc::new(ValueCache::new(
495 "storage_blob",
496 sizes.blob_cache_size,
497 interval,
498 )),
499 confirmed_block: Arc::new(ValueCache::new(
500 "storage_confirmed_block",
501 sizes.confirmed_block_cache_size,
502 interval,
503 )),
504 certificate: Arc::new(ValueCache::new(
505 "storage_certificate",
506 sizes.certificate_cache_size,
507 interval,
508 )),
509 certificate_raw: Arc::new(ValueCache::new(
510 "storage_certificate_raw",
511 sizes.certificate_raw_cache_size,
512 interval,
513 )),
514 event: Arc::new(ValueCache::new(
515 "storage_event",
516 sizes.event_cache_size,
517 interval,
518 )),
519 block_hash_by_height: Arc::new(ValueCache::new(
520 "storage_block_hash_by_height",
521 sizes.block_hash_by_height_cache_size,
522 interval,
523 )),
524 event_block_height: Arc::new(ValueCache::new(
525 "storage_event_block_height",
526 sizes.event_block_height_cache_size,
527 interval,
528 )),
529 network_description: Arc::new(OnceLock::new()),
530 }
531 }
532}
533
534#[derive(Clone)]
536pub struct DbStorage<Database, Clock = WallClock> {
537 database: Arc<Database>,
538 clock: Clock,
539 thread_pool: Arc<linera_execution::ThreadPool>,
540 wasm_runtime: Option<WasmRuntime>,
541 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
542 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
543 shared_committees: SharedCommittees,
544 caches: StorageCaches,
545 execution_runtime_config: ExecutionRuntimeConfig,
546}
547
548#[derive(Debug, Serialize, Deserialize)]
550pub enum RootKey {
551 NetworkDescription,
553 BlockExporterState(u32),
555 ChainState(ChainId),
557 BlockHash(CryptoHash),
559 BlobId(BlobId),
561 Event(ChainId),
563 BlockByHeight(ChainId),
565 EventBlockHeight(ChainId),
567}
568
569const CHAIN_ID_TAG: u8 = 2;
570const BLOB_ID_TAG: u8 = 4;
571const EVENT_ID_TAG: u8 = 5;
572
573impl RootKey {
574 pub fn bytes(&self) -> Vec<u8> {
576 bcs::to_bytes(self).unwrap()
577 }
578}
579
580#[derive(Debug, Serialize, Deserialize)]
581struct RestrictedEventId {
582 pub stream_id: StreamId,
583 pub index: u32,
584}
585
586fn to_event_key(event_id: &EventId) -> Vec<u8> {
587 let restricted_event_id = RestrictedEventId {
588 stream_id: event_id.stream_id.clone(),
589 index: event_id.index,
590 };
591 bcs::to_bytes(&restricted_event_id).unwrap()
592}
593
594pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
595 bcs::to_bytes(&height).unwrap()
596}
597
598fn is_chain_state(root_key: &[u8]) -> bool {
599 if root_key.is_empty() {
600 return false;
601 }
602 root_key[0] == CHAIN_ID_TAG
603}
604
605#[derive(Clone, Copy)]
608pub struct ChainStatesFirstAssignment;
609
610impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
611 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
612 if root_key.is_empty() {
613 return Ok(StoreInUse::Second);
614 }
615 let store = match is_chain_state(root_key) {
616 true => StoreInUse::First,
617 false => StoreInUse::Second,
618 };
619 Ok(store)
620 }
621}
622
623#[derive(Clone)]
625pub struct WallClock;
626
627#[cfg_attr(not(web), async_trait)]
628#[cfg_attr(web, async_trait(?Send))]
629impl Clock for WallClock {
630 fn current_time(&self) -> Timestamp {
631 Timestamp::now()
632 }
633
634 async fn sleep_until(&self, timestamp: Timestamp) {
635 let delta = timestamp.delta_since(Timestamp::now());
636 if delta > TimeDelta::ZERO {
637 linera_base::time::timer::sleep(delta.as_duration()).await
638 }
639 }
640}
641
642#[cfg(with_testing)]
643#[derive(Default)]
644struct TestClockInner {
645 time: Timestamp,
646 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
647 sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
650}
651
652#[cfg(with_testing)]
653impl TestClockInner {
654 fn set(&mut self, time: Timestamp) {
655 self.time = time;
656 let senders = self.sleeps.split_off(&Reverse(time));
657 for sender in senders.into_values().flatten() {
658 sender.send(()).ok();
660 }
661 }
662
663 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
664 let (sender, receiver) = oneshot::channel();
665 let should_auto_advance = self
666 .sleep_callback
667 .as_ref()
668 .is_some_and(|callback| callback(time));
669 if should_auto_advance && time > self.time {
670 self.set(time);
672 sender.send(()).ok();
674 } else if self.time >= time {
675 sender.send(()).ok();
677 } else {
678 self.sleeps.entry(Reverse(time)).or_default().push(sender);
679 }
680 receiver
681 }
682}
683
684#[cfg(with_testing)]
687#[derive(Clone, Default)]
688pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
689
690#[cfg(with_testing)]
691#[cfg_attr(not(web), async_trait)]
692#[cfg_attr(web, async_trait(?Send))]
693impl Clock for TestClock {
694 fn current_time(&self) -> Timestamp {
695 self.lock().time
696 }
697
698 async fn sleep_until(&self, timestamp: Timestamp) {
699 let receiver = self.lock().add_sleep_until(timestamp);
700 receiver.await.ok();
702 }
703}
704
705#[cfg(with_testing)]
706impl TestClock {
707 pub fn new() -> Self {
709 TestClock(Arc::default())
710 }
711
712 pub fn set(&self, time: Timestamp) {
714 self.lock().set(time);
715 }
716
717 pub fn add(&self, delta: TimeDelta) {
719 let mut guard = self.lock();
720 let time = guard.time.saturating_add(delta);
721 guard.set(time);
722 }
723
724 pub fn current_time(&self) -> Timestamp {
726 self.lock().time
727 }
728
729 #[cfg(with_testing)]
734 pub fn set_sleep_callback<F>(&self, callback: F)
735 where
736 F: Fn(Timestamp) -> bool + Send + Sync + 'static,
737 {
738 self.lock().sleep_callback = Some(Box::new(callback));
739 }
740
741 fn lock(&self) -> std::sync::MutexGuard<'_, TestClockInner> {
742 self.0.lock().expect("poisoned TestClock mutex")
743 }
744}
745
746#[cfg_attr(not(web), async_trait)]
747#[cfg_attr(web, async_trait(?Send))]
748impl<Database, C> Storage for DbStorage<Database, C>
749where
750 Database: KeyValueDatabase<
751 Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
752 Error: Send + Sync,
753 > + Clone
754 + linera_base::util::traits::AutoTraits
755 + 'static,
756 C: Clock + Clone + Send + Sync + 'static,
757{
758 type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
759 type Clock = C;
760 type BlockExporterContext = ViewContext<u32, Database::Store>;
761
762 fn clock(&self) -> &C {
763 &self.clock
764 }
765
766 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
767 &self.thread_pool
768 }
769
770 fn shared_committees(&self) -> &SharedCommittees {
771 &self.shared_committees
772 }
773
774 #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
775 async fn load_chain(
776 &self,
777 chain_id: ChainId,
778 ) -> Result<ChainStateView<Self::Context>, ViewError> {
779 #[cfg(with_metrics)]
780 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
781 let runtime_context = ChainRuntimeContext {
782 storage: self.clone(),
783 thread_pool: self.thread_pool.clone(),
784 chain_id,
785 execution_runtime_config: self.execution_runtime_config,
786 user_contracts: self.user_contracts.clone(),
787 user_services: self.user_services.clone(),
788 };
789 let root_key = RootKey::ChainState(chain_id).bytes();
790 let store = self.database.open_exclusive(&root_key)?;
791 let context = ViewContext::create_root_context(store, runtime_context).await?;
792 ChainStateView::load(context).await
793 }
794
795 #[instrument(level = "trace", skip_all, fields(%blob_id))]
796 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
797 if self.caches.blob.contains(&blob_id) {
798 #[cfg(with_metrics)]
799 metrics::CONTAINS_BLOB_COUNTER
800 .with_label_values(&[metrics::CACHE])
801 .inc();
802 return Ok(true);
803 }
804 let root_key = RootKey::BlobId(blob_id).bytes();
805 let store = self.database.open_shared(&root_key)?;
806 let test = store.contains_key(BLOB_KEY).await?;
807 #[cfg(with_metrics)]
808 metrics::CONTAINS_BLOB_COUNTER
809 .with_label_values(&[metrics::DB])
810 .inc();
811 Ok(test)
812 }
813
814 #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
815 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
816 let mut missing_blobs = Vec::new();
817 #[cfg(with_metrics)]
818 let mut cache_hits: u64 = 0;
819 #[cfg(with_metrics)]
820 let mut db_checks: u64 = 0;
821 for blob_id in blob_ids {
822 if self.caches.blob.contains(blob_id) {
823 #[cfg(with_metrics)]
824 {
825 cache_hits += 1;
826 }
827 continue;
828 }
829 #[cfg(with_metrics)]
830 {
831 db_checks += 1;
832 }
833 let root_key = RootKey::BlobId(*blob_id).bytes();
834 let store = self.database.open_shared(&root_key)?;
835 if !store.contains_key(BLOB_KEY).await? {
836 missing_blobs.push(*blob_id);
837 }
838 }
839 #[cfg(with_metrics)]
840 {
841 if cache_hits > 0 {
842 metrics::CONTAINS_BLOBS_COUNTER
843 .with_label_values(&[metrics::CACHE])
844 .inc_by(cache_hits);
845 }
846 if db_checks > 0 {
847 metrics::CONTAINS_BLOBS_COUNTER
848 .with_label_values(&[metrics::DB])
849 .inc_by(db_checks);
850 }
851 }
852 Ok(missing_blobs)
853 }
854
855 #[instrument(skip_all, fields(%blob_id))]
856 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
857 let root_key = RootKey::BlobId(blob_id).bytes();
858 let store = self.database.open_shared(&root_key)?;
859 let test = store.contains_key(BLOB_STATE_KEY).await?;
860 #[cfg(with_metrics)]
861 metrics::CONTAINS_BLOB_STATE_COUNTER
862 .with_label_values(&[metrics::DB])
863 .inc();
864 Ok(test)
865 }
866
867 #[instrument(skip_all, fields(%hash))]
868 async fn read_confirmed_block(
869 &self,
870 hash: CryptoHash,
871 ) -> Result<Option<CacheArc<ConfirmedBlock>>, ViewError> {
872 if let Some(block) = self.caches.confirmed_block.get(&hash) {
873 #[cfg(with_metrics)]
874 metrics::READ_CONFIRMED_BLOCK_COUNTER
875 .with_label_values(&[metrics::CACHE])
876 .inc();
877 return Ok(Some(block));
878 }
879 let root_key = RootKey::BlockHash(hash).bytes();
880 let store = self.database.open_shared(&root_key)?;
881 let value = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await?;
882 #[cfg(with_metrics)]
883 metrics::READ_CONFIRMED_BLOCK_COUNTER
884 .with_label_values(&[metrics::DB])
885 .inc();
886 match value {
887 Some(block) => Ok(Some(self.caches.confirmed_block.insert(&hash, block))),
888 None => Ok(None),
889 }
890 }
891
892 #[instrument(skip_all)]
893 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
894 &self,
895 hashes: I,
896 ) -> Result<Vec<Option<CacheArc<ConfirmedBlock>>>, ViewError> {
897 let hashes = hashes.into_iter().collect::<Vec<_>>();
898 if hashes.is_empty() {
899 return Ok(Vec::new());
900 }
901 let mut results = vec![None; hashes.len()];
902 let mut misses = Vec::new();
903 for (i, hash) in hashes.iter().enumerate() {
904 if let Some(block) = self.caches.confirmed_block.get(hash) {
905 results[i] = Some(block);
906 } else {
907 misses.push(i);
908 }
909 }
910 if !misses.is_empty() {
911 let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
912 let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
913 for (miss_idx, root_key) in misses.iter().zip(root_keys) {
914 let store = self.database.open_shared(&root_key)?;
915 if let Some(block) = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await? {
916 results[*miss_idx] = Some(
917 self.caches
918 .confirmed_block
919 .insert(&hashes[*miss_idx], block),
920 );
921 }
922 }
923 }
924 #[cfg(with_metrics)]
925 {
926 let cache_hits = (hashes.len() - misses.len()) as u64;
927 if cache_hits > 0 {
928 metrics::READ_CONFIRMED_BLOCKS_COUNTER
929 .with_label_values(&[metrics::CACHE])
930 .inc_by(cache_hits);
931 }
932 let db_reads = misses.len() as u64;
933 if db_reads > 0 {
934 metrics::READ_CONFIRMED_BLOCKS_COUNTER
935 .with_label_values(&[metrics::DB])
936 .inc_by(db_reads);
937 }
938 }
939 Ok(results)
940 }
941
942 #[instrument(skip_all, fields(%blob_id))]
943 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<CacheArc<Blob>>, ViewError> {
944 if let Some(blob) = self.caches.blob.get(&blob_id) {
945 #[cfg(with_metrics)]
946 metrics::READ_BLOB_COUNTER
947 .with_label_values(&[metrics::CACHE])
948 .inc();
949 return Ok(Some(blob));
950 }
951 let root_key = RootKey::BlobId(blob_id).bytes();
952 let store = self.database.open_shared(&root_key)?;
953 let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
954 #[cfg(with_metrics)]
955 metrics::READ_BLOB_COUNTER
956 .with_label_values(&[metrics::DB])
957 .inc();
958 match maybe_blob_bytes {
959 Some(blob_bytes) => {
960 let blob = Blob::new_with_id_unchecked(blob_id, blob_bytes);
961 Ok(Some(self.caches.blob.insert(&blob_id, blob)))
962 }
963 None => Ok(None),
964 }
965 }
966
967 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
968 async fn read_blobs(
969 &self,
970 blob_ids: &[BlobId],
971 ) -> Result<Vec<Option<CacheArc<Blob>>>, ViewError> {
972 if blob_ids.is_empty() {
973 return Ok(Vec::new());
974 }
975 futures::future::try_join_all(blob_ids.iter().map(|blob_id| self.read_blob(*blob_id))).await
981 }
982
983 #[instrument(skip_all, fields(%blob_id))]
984 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
985 let root_key = RootKey::BlobId(blob_id).bytes();
986 let store = self.database.open_shared(&root_key)?;
987 let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
988 #[cfg(with_metrics)]
989 metrics::READ_BLOB_STATE_COUNTER
990 .with_label_values(&[metrics::DB])
991 .inc();
992 Ok(blob_state)
993 }
994
995 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
996 async fn read_blob_states(
997 &self,
998 blob_ids: &[BlobId],
999 ) -> Result<Vec<Option<BlobState>>, ViewError> {
1000 if blob_ids.is_empty() {
1001 return Ok(Vec::new());
1002 }
1003 futures::future::try_join_all(
1004 blob_ids
1005 .iter()
1006 .map(|blob_id| self.read_blob_state(*blob_id)),
1007 )
1008 .await
1009 }
1010
1011 #[instrument(skip_all, fields(blob_id = %blob.id()))]
1012 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
1013 let mut batch = MultiPartitionBatch::new();
1014 batch.add_blob(blob);
1015 self.write_batch(batch).await?;
1016 Ok(())
1017 }
1018
1019 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1020 async fn maybe_write_blob_states(
1021 &self,
1022 blob_ids: &[BlobId],
1023 blob_state: BlobState,
1024 ) -> Result<(), ViewError> {
1025 if blob_ids.is_empty() {
1026 return Ok(());
1027 }
1028 let mut maybe_blob_states = Vec::new();
1029 for blob_id in blob_ids {
1030 let root_key = RootKey::BlobId(*blob_id).bytes();
1031 let store = self.database.open_shared(&root_key)?;
1032 let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1033 maybe_blob_states.push(maybe_blob_state);
1034 }
1035 let mut batch = MultiPartitionBatch::new();
1036 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
1037 match maybe_blob_state {
1038 None => {
1039 batch.add_blob_state(*blob_id, &blob_state)?;
1040 }
1041 Some(state) => {
1042 if state.epoch < blob_state.epoch {
1043 batch.add_blob_state(*blob_id, &blob_state)?;
1044 }
1045 }
1046 }
1047 }
1048 self.write_batch(batch).await?;
1052 Ok(())
1053 }
1054
1055 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1056 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
1057 if blobs.is_empty() {
1058 return Ok(Vec::new());
1059 }
1060 let mut batch = MultiPartitionBatch::new();
1061 let mut blob_states = Vec::new();
1062 for blob in blobs {
1063 let root_key = RootKey::BlobId(blob.id()).bytes();
1064 let store = self.database.open_shared(&root_key)?;
1065 let has_state = store.contains_key(BLOB_STATE_KEY).await?;
1066 blob_states.push(has_state);
1067 if has_state {
1068 batch.add_blob(blob);
1069 }
1070 }
1071 self.write_batch(batch).await?;
1072 Ok(blob_states)
1073 }
1074
1075 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1076 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
1077 if blobs.is_empty() {
1078 return Ok(());
1079 }
1080 let mut batch = MultiPartitionBatch::new();
1081 for blob in blobs {
1082 batch.add_blob(blob);
1083 }
1084 self.write_batch(batch).await
1085 }
1086
1087 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1088 async fn write_blobs_and_certificate(
1089 &self,
1090 blobs: &[Blob],
1091 certificate: &ConfirmedBlockCertificate,
1092 ) -> Result<(), ViewError> {
1093 let mut batch = MultiPartitionBatch::new();
1094 for blob in blobs {
1095 batch.add_blob(blob);
1096 }
1097 batch.add_certificate(certificate)?;
1098 self.write_batch(batch).await?;
1099 let block = certificate.value().block();
1101 let chain_id = block.header.chain_id;
1102 let height = block.header.height;
1103 let hash = certificate.hash();
1104 self.caches
1105 .block_hash_by_height
1106 .insert(&(chain_id, height), hash);
1107 for event in block.body.events.iter().flatten() {
1108 let event_id = EventId {
1109 chain_id,
1110 stream_id: event.stream_id.clone(),
1111 index: event.index,
1112 };
1113 self.caches.event_block_height.insert(&event_id, height);
1114 }
1115 Ok(())
1116 }
1117
1118 fn cache_certificate(
1119 &self,
1120 certificate: ConfirmedBlockCertificate,
1121 ) -> CacheArc<ConfirmedBlockCertificate> {
1122 self.caches
1123 .certificate
1124 .insert(&certificate.hash(), certificate)
1125 }
1126
1127 fn cache_blob(&self, blob: Blob) -> CacheArc<Blob> {
1128 self.caches.blob.insert(&blob.id(), blob)
1129 }
1130
1131 fn cache_confirmed_block(&self, block: ConfirmedBlock) -> CacheArc<ConfirmedBlock> {
1132 self.caches.confirmed_block.insert(&block.hash(), block)
1133 }
1134
1135 #[instrument(skip_all, fields(%hash))]
1136 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
1137 if self.caches.certificate.contains(&hash) || self.caches.certificate_raw.contains(&hash) {
1138 #[cfg(with_metrics)]
1139 metrics::CONTAINS_CERTIFICATE_COUNTER
1140 .with_label_values(&[metrics::CACHE])
1141 .inc();
1142 return Ok(true);
1143 }
1144 let root_key = RootKey::BlockHash(hash).bytes();
1145 let store = self.database.open_shared(&root_key)?;
1146 let results = store.contains_keys(&get_block_keys()).await?;
1147 #[cfg(with_metrics)]
1148 metrics::CONTAINS_CERTIFICATE_COUNTER
1149 .with_label_values(&[metrics::DB])
1150 .inc();
1151 Ok(results[0] && results[1])
1152 }
1153
1154 #[instrument(skip_all, fields(%hash))]
1155 async fn read_certificate(
1156 &self,
1157 hash: CryptoHash,
1158 ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, ViewError> {
1159 if let Some(cert) = self.caches.certificate.get(&hash) {
1161 #[cfg(with_metrics)]
1162 metrics::READ_CERTIFICATE_COUNTER
1163 .with_label_values(&[metrics::CACHE])
1164 .inc();
1165 return Ok(Some(cert));
1166 }
1167 if let Some(raw) = self.caches.certificate_raw.get(&hash) {
1169 #[cfg(with_metrics)]
1170 metrics::READ_CERTIFICATE_COUNTER
1171 .with_label_values(&[metrics::CACHE])
1172 .inc();
1173 return self.deserialize_and_cache_certificate(&raw.0, &raw.1);
1174 }
1175 let root_key = RootKey::BlockHash(hash).bytes();
1177 let store = self.database.open_shared(&root_key)?;
1178 let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1179 #[cfg(with_metrics)]
1180 metrics::READ_CERTIFICATE_COUNTER
1181 .with_label_values(&[metrics::DB])
1182 .inc();
1183 let Some(lite_cert_bytes) = values[0].as_ref() else {
1184 return Ok(None);
1185 };
1186 let Some(confirmed_block_bytes) = values[1].as_ref() else {
1187 return Ok(None);
1188 };
1189 self.caches.certificate_raw.insert(
1190 &hash,
1191 (lite_cert_bytes.clone(), confirmed_block_bytes.clone()),
1192 );
1193 self.deserialize_and_cache_certificate(lite_cert_bytes, confirmed_block_bytes)
1194 }
1195
1196 #[instrument(skip_all)]
1197 async fn read_certificates(
1198 &self,
1199 hashes: &[CryptoHash],
1200 ) -> Result<Vec<Option<CacheArc<ConfirmedBlockCertificate>>>, ViewError> {
1201 let raw_certs = self.read_certificates_raw(hashes).await?;
1202
1203 raw_certs
1204 .into_iter()
1205 .map(|maybe_raw| {
1206 let Some(raw) = maybe_raw else {
1207 return Ok(None);
1208 };
1209 self.deserialize_and_cache_certificate(&raw.0, &raw.1)
1210 })
1211 .collect()
1212 }
1213
1214 #[instrument(skip_all)]
1215 async fn read_certificates_raw(
1216 &self,
1217 hashes: &[CryptoHash],
1218 ) -> Result<Vec<Option<CacheArc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1219 if hashes.is_empty() {
1220 return Ok(Vec::new());
1221 }
1222 let mut results = vec![None; hashes.len()];
1223 let mut misses = Vec::new();
1224 for (i, hash) in hashes.iter().enumerate() {
1225 if let Some(raw) = self.caches.certificate_raw.get(hash) {
1226 results[i] = Some(raw);
1227 } else {
1228 misses.push(i);
1229 }
1230 }
1231 if !misses.is_empty() {
1232 let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
1233 let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
1234 for (miss_idx, root_key) in misses.iter().zip(root_keys) {
1235 let store = self.database.open_shared(&root_key)?;
1236 let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1237 if let (Some(lite), Some(block)) = (values[0].as_ref(), values[1].as_ref()) {
1238 results[*miss_idx] = Some(
1239 self.caches
1240 .certificate_raw
1241 .insert(&hashes[*miss_idx], (lite.clone(), block.clone())),
1242 );
1243 }
1244 }
1245 }
1246 #[cfg(with_metrics)]
1247 {
1248 let cache_hits = (hashes.len() - misses.len()) as u64;
1249 if cache_hits > 0 {
1250 metrics::READ_CERTIFICATES_COUNTER
1251 .with_label_values(&[metrics::CACHE])
1252 .inc_by(cache_hits);
1253 }
1254 let db_reads = misses.len() as u64;
1255 if db_reads > 0 {
1256 metrics::READ_CERTIFICATES_COUNTER
1257 .with_label_values(&[metrics::DB])
1258 .inc_by(db_reads);
1259 }
1260 }
1261 Ok(results)
1262 }
1263
1264 async fn read_certificate_hashes_by_heights(
1265 &self,
1266 chain_id: ChainId,
1267 heights: &[BlockHeight],
1268 ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
1269 if heights.is_empty() {
1270 return Ok(Vec::new());
1271 }
1272
1273 let mut results = vec![None; heights.len()];
1274 let mut misses = Vec::new();
1275 for (i, &height) in heights.iter().enumerate() {
1276 if let Some(hash) = self.caches.block_hash_by_height.get(&(chain_id, height)) {
1277 results[i] = Some(*hash);
1278 } else {
1279 misses.push(i);
1280 }
1281 }
1282 #[cfg(with_metrics)]
1283 {
1284 let cache_hits = (heights.len() - misses.len()) as u64;
1285 if cache_hits > 0 {
1286 metrics::READ_BLOCK_HASH_BY_HEIGHT_COUNTER
1287 .with_label_values(&[metrics::CACHE])
1288 .inc_by(cache_hits);
1289 }
1290 }
1291 if !misses.is_empty() {
1292 let miss_keys: Vec<Vec<u8>> =
1293 misses.iter().map(|&i| to_height_key(heights[i])).collect();
1294 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1295 let store = self.database.open_shared(&index_root_key)?;
1296 let hash_bytes = store.read_multi_values_bytes(&miss_keys).await?;
1297 #[cfg(with_metrics)]
1298 {
1299 let db_reads = misses.len() as u64;
1300 metrics::READ_BLOCK_HASH_BY_HEIGHT_COUNTER
1301 .with_label_values(&[metrics::DB])
1302 .inc_by(db_reads);
1303 }
1304 for (miss_idx, opt_bytes) in misses.iter().zip(hash_bytes) {
1305 if let Some(bytes) = opt_bytes {
1306 let hash = bcs::from_bytes::<CryptoHash>(&bytes)?;
1307 self.caches
1308 .block_hash_by_height
1309 .insert(&(chain_id, heights[*miss_idx]), hash);
1310 results[*miss_idx] = Some(hash);
1311 }
1312 }
1313 }
1314
1315 Ok(results)
1316 }
1317
1318 async fn read_event_block_heights(
1319 &self,
1320 event_ids: &[EventId],
1321 ) -> Result<Vec<Option<BlockHeight>>, ViewError> {
1322 if event_ids.is_empty() {
1323 return Ok(Vec::new());
1324 }
1325
1326 let mut results = vec![None; event_ids.len()];
1327 let mut misses: Vec<usize> = Vec::new();
1329 for (i, event_id) in event_ids.iter().enumerate() {
1330 if let Some(height) = self.caches.event_block_height.get(event_id) {
1331 results[i] = Some(*height);
1332 } else {
1333 misses.push(i);
1334 }
1335 }
1336 #[cfg(with_metrics)]
1337 {
1338 let cache_hits = (event_ids.len() - misses.len()) as u64;
1339 if cache_hits > 0 {
1340 metrics::READ_EVENT_BLOCK_HEIGHT_COUNTER
1341 .with_label_values(&[metrics::CACHE])
1342 .inc_by(cache_hits);
1343 }
1344 }
1345 if misses.is_empty() {
1346 return Ok(results);
1347 }
1348 let mut chain_groups = BTreeMap::<_, Vec<_>>::new();
1350 for &i in &misses {
1351 let event_id = &event_ids[i];
1352 chain_groups
1353 .entry(event_id.chain_id)
1354 .or_default()
1355 .push((i, to_event_key(event_id)));
1356 }
1357 for (chain_id, entries) in chain_groups {
1358 let root_key = RootKey::EventBlockHeight(chain_id).bytes();
1359 let store = self.database.open_shared(&root_key)?;
1360 let keys = entries
1361 .iter()
1362 .map(|(_, key)| key.clone())
1363 .collect::<Vec<_>>();
1364 let values = store.read_multi_values_bytes(&keys).await?;
1365 #[cfg(with_metrics)]
1366 {
1367 let db_reads = entries.len() as u64;
1368 metrics::READ_EVENT_BLOCK_HEIGHT_COUNTER
1369 .with_label_values(&[metrics::DB])
1370 .inc_by(db_reads);
1371 }
1372 for ((original_index, _), value) in entries.into_iter().zip(values) {
1373 if let Some(bytes) = value {
1374 let height = bcs::from_bytes::<BlockHeight>(&bytes)?;
1375 self.caches
1376 .event_block_height
1377 .insert(&event_ids[original_index], height);
1378 results[original_index] = Some(height);
1379 }
1380 }
1381 }
1382 Ok(results)
1383 }
1384
1385 #[instrument(skip_all)]
1386 async fn read_certificates_by_heights_raw(
1387 &self,
1388 chain_id: ChainId,
1389 heights: &[BlockHeight],
1390 ) -> Result<Vec<Option<CacheArc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1391 let hashes: Vec<Option<CryptoHash>> = self
1392 .read_certificate_hashes_by_heights(chain_id, heights)
1393 .await?;
1394
1395 let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
1397 for (index, maybe_hash) in hashes.iter().enumerate() {
1398 if let Some(hash) = maybe_hash {
1399 indices.entry(*hash).or_default().push(index);
1400 }
1401 }
1402
1403 let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
1405
1406 let mut result = vec![None; heights.len()];
1407
1408 for (raw_cert, hash) in self
1409 .read_certificates_raw(&unique_hashes)
1410 .await?
1411 .into_iter()
1412 .zip(unique_hashes)
1413 {
1414 if let Some(idx_list) = indices.get(&hash) {
1415 for &index in idx_list {
1416 result[index] = raw_cert.clone();
1417 }
1418 } else {
1419 tracing::error!(?hash, "certificate hash not found in indices map",);
1421 }
1422 }
1423
1424 Ok(result)
1425 }
1426
1427 #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
1428 async fn read_certificates_by_heights(
1429 &self,
1430 chain_id: ChainId,
1431 heights: &[BlockHeight],
1432 ) -> Result<Vec<Option<CacheArc<ConfirmedBlockCertificate>>>, ViewError> {
1433 self.read_certificates_by_heights_raw(chain_id, heights)
1434 .await?
1435 .into_iter()
1436 .map(|maybe_raw| match maybe_raw {
1437 None => Ok(None),
1438 Some(raw) => self.deserialize_and_cache_certificate(&raw.0, &raw.1),
1439 })
1440 .collect()
1441 }
1442
1443 #[instrument(skip_all, fields(event_id = ?event_id))]
1444 async fn read_event(&self, event_id: EventId) -> Result<Option<CacheArc<Vec<u8>>>, ViewError> {
1445 if let Some(event) = self.caches.event.get(&event_id) {
1446 #[cfg(with_metrics)]
1447 metrics::READ_EVENT_COUNTER
1448 .with_label_values(&[metrics::CACHE])
1449 .inc();
1450 return Ok(Some(event));
1451 }
1452 let event_key = to_event_key(&event_id);
1453 let root_key = RootKey::Event(event_id.chain_id).bytes();
1454 let store = self.database.open_shared(&root_key)?;
1455 let event = store.read_value_bytes(&event_key).await?;
1456 #[cfg(with_metrics)]
1457 metrics::READ_EVENT_COUNTER
1458 .with_label_values(&[metrics::DB])
1459 .inc();
1460 match event {
1461 Some(event_bytes) => Ok(Some(self.caches.event.insert(&event_id, event_bytes))),
1462 None => Ok(None),
1463 }
1464 }
1465
1466 #[instrument(skip_all, fields(event_id = ?event_id))]
1467 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1468 if self.caches.event.contains(&event_id) {
1469 #[cfg(with_metrics)]
1470 metrics::CONTAINS_EVENT_COUNTER
1471 .with_label_values(&[metrics::CACHE])
1472 .inc();
1473 return Ok(true);
1474 }
1475 let event_key = to_event_key(&event_id);
1476 let root_key = RootKey::Event(event_id.chain_id).bytes();
1477 let store = self.database.open_shared(&root_key)?;
1478 let exists = store.contains_key(&event_key).await?;
1479 #[cfg(with_metrics)]
1480 metrics::CONTAINS_EVENT_COUNTER
1481 .with_label_values(&[metrics::DB])
1482 .inc();
1483 Ok(exists)
1484 }
1485
1486 #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
1487 async fn read_events_from_index(
1488 &self,
1489 chain_id: &ChainId,
1490 stream_id: &StreamId,
1491 start_index: u32,
1492 ) -> Result<Vec<IndexAndEvent>, ViewError> {
1493 let root_key = RootKey::Event(*chain_id).bytes();
1494 let store = self.database.open_shared(&root_key)?;
1495 let mut entries = Vec::new();
1498 let mut db_keys = Vec::new();
1499 let prefix = bcs::to_bytes(stream_id).unwrap();
1500 for short_key in store.find_keys_by_prefix(&prefix).await? {
1501 let index = bcs::from_bytes::<u32>(&short_key)?;
1502 if index >= start_index {
1503 let event_id = EventId {
1504 chain_id: *chain_id,
1505 stream_id: stream_id.clone(),
1506 index,
1507 };
1508 let cached = self.caches.event.get(&event_id).map(|arc| (*arc).clone());
1509 if cached.is_none() {
1510 let mut key = prefix.clone();
1511 key.extend(short_key);
1512 db_keys.push(key);
1513 }
1514 entries.push((index, cached));
1515 }
1516 }
1517 let mut db_values = if db_keys.is_empty() {
1518 Vec::new()
1519 } else {
1520 store.read_multi_values_bytes(&db_keys).await?
1521 }
1522 .into_iter();
1523 let mut returned_values = Vec::with_capacity(entries.len());
1524 for (index, cached) in entries {
1525 let event = match cached {
1526 Some(event) => event,
1527 None => {
1528 let event_bytes = db_values
1529 .next()
1530 .expect("one database value per cache miss")
1531 .unwrap();
1532 let event_id = EventId {
1533 chain_id: *chain_id,
1534 stream_id: stream_id.clone(),
1535 index,
1536 };
1537 self.caches.event.insert(&event_id, event_bytes.clone());
1538 event_bytes
1539 }
1540 };
1541 returned_values.push(IndexAndEvent { index, event });
1542 }
1543 Ok(returned_values)
1544 }
1545
1546 #[instrument(skip_all)]
1547 async fn write_events(
1548 &self,
1549 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1550 ) -> Result<(), ViewError> {
1551 let mut batch = MultiPartitionBatch::new();
1552 for (event_id, value) in events {
1553 batch.add_event(&event_id, value);
1554 }
1555 self.write_batch(batch).await
1556 }
1557
1558 #[instrument(skip_all)]
1559 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1560 if let Some(desc) = self.caches.network_description.get() {
1561 #[cfg(with_metrics)]
1562 metrics::READ_NETWORK_DESCRIPTION
1563 .with_label_values(&[metrics::CACHE])
1564 .inc();
1565 return Ok(Some(desc.clone()));
1566 }
1567 let root_key = RootKey::NetworkDescription.bytes();
1568 let store = self.database.open_shared(&root_key)?;
1569 let maybe_value: Option<NetworkDescription> =
1570 store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1571 #[cfg(with_metrics)]
1572 metrics::READ_NETWORK_DESCRIPTION
1573 .with_label_values(&[metrics::DB])
1574 .inc();
1575 if let Some(ref desc) = maybe_value {
1576 if self.caches.network_description.set(desc.clone()).is_err() {
1577 debug!("network description cache was already populated concurrently");
1578 }
1579 }
1580 Ok(maybe_value)
1581 }
1582
1583 #[instrument(skip_all)]
1584 async fn write_network_description(
1585 &self,
1586 information: &NetworkDescription,
1587 ) -> Result<(), ViewError> {
1588 let mut batch = MultiPartitionBatch::new();
1589 batch.add_network_description(information)?;
1590 self.write_batch(batch).await?;
1591 Ok(())
1592 }
1593
1594 fn wasm_runtime(&self) -> Option<WasmRuntime> {
1595 self.wasm_runtime
1596 }
1597
1598 #[instrument(skip_all)]
1599 async fn block_exporter_context(
1600 &self,
1601 block_exporter_id: u32,
1602 ) -> Result<Self::BlockExporterContext, ViewError> {
1603 let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1604 let store = self.database.open_exclusive(&root_key)?;
1605 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1606 }
1607
1608 async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError> {
1609 let root_keys = self.database.list_root_keys().await?;
1610 let mut blob_ids = Vec::new();
1611 for root_key in root_keys {
1612 if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1613 let root_key_red = &root_key[1..];
1614 let blob_id = bcs::from_bytes(root_key_red)?;
1615 blob_ids.push(blob_id);
1616 }
1617 }
1618 Ok(blob_ids)
1619 }
1620
1621 async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError> {
1622 let root_keys = self.database.list_root_keys().await?;
1623 let mut chain_ids = Vec::new();
1624 for root_key in root_keys {
1625 if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1626 let root_key_red = &root_key[1..];
1627 let chain_id = bcs::from_bytes(root_key_red)?;
1628 chain_ids.push(chain_id);
1629 }
1630 }
1631 Ok(chain_ids)
1632 }
1633
1634 async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError> {
1635 let root_keys = self.database.list_root_keys().await?;
1636 let mut event_ids = Vec::new();
1637 for root_key in root_keys {
1638 if !root_key.is_empty() && root_key[0] == EVENT_ID_TAG {
1639 let root_key_red = &root_key[1..];
1640 let chain_id = bcs::from_bytes(root_key_red)?;
1641 let store = self.database.open_shared(&root_key)?;
1642 let keys = store.find_keys_by_prefix(&[]).await?;
1643 for key in keys {
1644 let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
1645 let event_id = EventId {
1646 chain_id,
1647 stream_id: restricted_event_id.stream_id,
1648 index: restricted_event_id.index,
1649 };
1650 event_ids.push(event_id);
1651 }
1652 }
1653 }
1654 Ok(event_ids)
1655 }
1656}
1657
1658impl<Database, C> DbStorage<Database, C>
1659where
1660 Database: KeyValueDatabase + Clone,
1661 Database::Store: KeyValueStore + Clone,
1662 C: Clock,
1663 Database::Error: Send + Sync,
1664{
1665 #[instrument(skip_all)]
1666 fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1667 hashes
1668 .iter()
1669 .map(|hash| RootKey::BlockHash(*hash).bytes())
1670 .collect()
1671 }
1672
1673 fn deserialize_and_cache_certificate(
1674 &self,
1675 lite_cert_bytes: &[u8],
1676 confirmed_block_bytes: &[u8],
1677 ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, ViewError> {
1678 let lite = bcs::from_bytes::<LiteCertificate>(lite_cert_bytes)?;
1679 let block = bcs::from_bytes::<ConfirmedBlock>(confirmed_block_bytes)?;
1680 let hash = block.hash();
1681 self.caches.confirmed_block.insert(&hash, block.clone());
1682 let certificate = lite
1683 .with_value(block)
1684 .ok_or(ViewError::InconsistentEntries)?;
1685 let arc = self.caches.certificate.insert(&hash, certificate);
1686 Ok(Some(arc))
1687 }
1688
1689 #[instrument(skip_all)]
1690 async fn write_entry(
1691 store: &Database::Store,
1692 key_values: Vec<(Vec<u8>, Vec<u8>)>,
1693 ) -> Result<(), ViewError> {
1694 let mut batch = Batch::new();
1695 for (key, value) in key_values {
1696 batch.put_key_value_bytes(key, value);
1697 }
1698 store.write_batch(batch).await?;
1699 Ok(())
1700 }
1701
1702 #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1703 async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1704 if batch.keys_value_bytes.is_empty() {
1705 return Ok(());
1706 }
1707 let mut futures = Vec::new();
1708 for (root_key, key_values) in batch.keys_value_bytes {
1709 let store = self.database.open_shared(&root_key)?;
1710 futures.push(async move { Self::write_entry(&store, key_values).await });
1711 }
1712 futures::future::try_join_all(futures).await?;
1713 Ok(())
1714 }
1715}
1716
1717impl<Database, C> DbStorage<Database, C> {
1718 fn new(
1719 database: Database,
1720 wasm_runtime: Option<WasmRuntime>,
1721 cache_sizes: StorageCacheConfig,
1722 clock: C,
1723 ) -> Self {
1724 Self {
1725 database: Arc::new(database),
1726 clock,
1727 #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1729 thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1730 wasm_runtime,
1731 user_contracts: Arc::new(papaya::HashMap::new()),
1732 user_services: Arc::new(papaya::HashMap::new()),
1733 shared_committees: SharedCommittees::new(),
1734 caches: StorageCaches::new(cache_sizes),
1735 execution_runtime_config: ExecutionRuntimeConfig::default(),
1736 }
1737 }
1738
1739 pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1741 self.execution_runtime_config.allow_application_logs = allow;
1742 self
1743 }
1744}
1745
1746impl<Database> DbStorage<Database, WallClock>
1747where
1748 Database: KeyValueDatabase + Clone + 'static,
1749 Database::Error: Send + Sync,
1750 Database::Store: KeyValueStore + Clone + 'static,
1751{
1752 pub async fn maybe_create_and_connect(
1754 config: &Database::Config,
1755 namespace: &str,
1756 wasm_runtime: Option<WasmRuntime>,
1757 cache_sizes: StorageCacheConfig,
1758 ) -> Result<Self, Database::Error> {
1759 let database = Database::maybe_create_and_connect(config, namespace).await?;
1760 Ok(Self::new(database, wasm_runtime, cache_sizes, WallClock))
1761 }
1762
1763 pub async fn connect(
1765 config: &Database::Config,
1766 namespace: &str,
1767 wasm_runtime: Option<WasmRuntime>,
1768 cache_sizes: StorageCacheConfig,
1769 ) -> Result<Self, Database::Error> {
1770 let database = Database::connect(config, namespace).await?;
1771 Ok(Self::new(database, wasm_runtime, cache_sizes, WallClock))
1772 }
1773}
1774
1775#[cfg(with_testing)]
1776impl<Database, C> DbStorage<Database, C>
1777where
1778 Database: linera_views::backends::DatabaseBackup,
1779{
1780 pub fn backup_to(&self, dir: &std::path::Path) -> anyhow::Result<()> {
1782 self.database.backup_to(dir)
1783 }
1784}
1785
1786#[cfg(with_testing)]
1787impl<Database> DbStorage<Database, TestClock>
1788where
1789 Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1790 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1791 Database::Error: Send + Sync,
1792{
1793 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1795 let config = Database::new_test_config().await.unwrap();
1796 let namespace = generate_test_namespace();
1797 DbStorage::<Database, TestClock>::new_for_testing(
1798 config,
1799 &namespace,
1800 wasm_runtime,
1801 TestClock::new(),
1802 )
1803 .await
1804 .unwrap()
1805 }
1806
1807 pub async fn new_for_testing(
1809 config: Database::Config,
1810 namespace: &str,
1811 wasm_runtime: Option<WasmRuntime>,
1812 clock: TestClock,
1813 ) -> Result<Self, Database::Error> {
1814 let database = Database::recreate_and_connect(&config, namespace).await?;
1815 Ok(Self::new(
1816 database,
1817 wasm_runtime,
1818 DEFAULT_STORAGE_CACHE_CONFIG,
1819 clock,
1820 ))
1821 }
1822
1823 pub async fn connect_for_testing(
1825 config: Database::Config,
1826 namespace: &str,
1827 wasm_runtime: Option<WasmRuntime>,
1828 clock: TestClock,
1829 ) -> Result<Self, Database::Error> {
1830 let database = Database::connect(&config, namespace).await?;
1831 Ok(Self::new(
1832 database,
1833 wasm_runtime,
1834 DEFAULT_STORAGE_CACHE_CONFIG,
1835 clock,
1836 ))
1837 }
1838}
1839
1840#[cfg(test)]
1841mod tests {
1842 use linera_base::{
1843 crypto::{CryptoHash, TestString},
1844 data_types::{BlockHeight, Epoch, Round, Timestamp},
1845 identifiers::{
1846 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
1847 StreamName,
1848 },
1849 };
1850 use linera_chain::{
1851 block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
1852 types::ConfirmedBlockCertificate,
1853 };
1854 use linera_views::{
1855 memory::MemoryDatabase,
1856 store::{KeyValueDatabase, ReadableKeyValueStore as _},
1857 };
1858
1859 use crate::{
1860 db_storage::{
1861 to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
1862 EVENT_ID_TAG,
1863 },
1864 DbStorage, Storage, TestClock,
1865 };
1866
1867 #[test]
1874 fn test_root_key_blob_serialization() {
1875 let hash = CryptoHash::default();
1876 let blob_type = BlobType::default();
1877 let blob_id = BlobId::new(hash, blob_type);
1878 let root_key = RootKey::BlobId(blob_id).bytes();
1879 assert_eq!(root_key[0], BLOB_ID_TAG);
1880 assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
1881 }
1882
1883 #[test]
1886 fn test_root_key_chainstate_serialization() {
1887 let hash = CryptoHash::default();
1888 let chain_id = ChainId(hash);
1889 let root_key = RootKey::ChainState(chain_id).bytes();
1890 assert_eq!(root_key[0], CHAIN_ID_TAG);
1891 assert_eq!(
1892 bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
1893 chain_id
1894 );
1895 }
1896
1897 #[test]
1900 fn test_root_key_event_serialization() {
1901 let hash = CryptoHash::test_hash("49");
1902 let chain_id = ChainId(hash);
1903 let application_description_hash = CryptoHash::test_hash("42");
1904 let application_id = ApplicationId::new(application_description_hash);
1905 let application_id = GenericApplicationId::User(application_id);
1906 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
1907 let stream_id = StreamId {
1908 application_id,
1909 stream_name,
1910 };
1911 let prefix = bcs::to_bytes(&stream_id).unwrap();
1912
1913 let index = 1567;
1914 let event_id = EventId {
1915 chain_id,
1916 stream_id,
1917 index,
1918 };
1919 let root_key = RootKey::Event(chain_id).bytes();
1920 assert_eq!(root_key[0], EVENT_ID_TAG);
1921 let key = to_event_key(&event_id);
1922 assert!(key.starts_with(&prefix));
1923 }
1924
1925 #[test]
1928 fn test_root_key_block_by_height_serialization() {
1929 use linera_base::data_types::BlockHeight;
1930
1931 let hash = CryptoHash::default();
1932 let chain_id = ChainId(hash);
1933 let height = BlockHeight(42);
1934
1935 let root_key = RootKey::BlockByHeight(chain_id).bytes();
1937 let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
1938 assert_eq!(deserialized_chain_id, chain_id);
1939
1940 let height_key = to_height_key(height);
1942 let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
1943 assert_eq!(deserialized_height, height);
1944 }
1945
1946 #[cfg(with_testing)]
1947 #[tokio::test]
1948 async fn test_add_certificate_creates_height_index() {
1949 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1951
1952 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1954 let height = BlockHeight(5);
1955 let block = Block {
1956 header: BlockHeader {
1957 chain_id,
1958 epoch: Epoch::ZERO,
1959 height,
1960 timestamp: Timestamp::from(0),
1961 state_hash: CryptoHash::new(&TestString::new("state_hash")),
1962 previous_block_hash: None,
1963 authenticated_owner: None,
1964 transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
1965 messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
1966 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1967 "prev_msg_blocks_hash",
1968 )),
1969 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1970 "prev_event_blocks_hash",
1971 )),
1972 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
1973 events_hash: CryptoHash::new(&TestString::new("events_hash")),
1974 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1975 operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
1976 },
1977 body: BlockBody {
1978 transactions: vec![],
1979 messages: vec![],
1980 previous_message_blocks: Default::default(),
1981 previous_event_blocks: Default::default(),
1982 oracle_responses: vec![],
1983 events: vec![],
1984 blobs: vec![],
1985 operation_results: vec![],
1986 },
1987 };
1988 let confirmed_block = ConfirmedBlock::new(block);
1989 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1990
1991 let mut batch = MultiPartitionBatch::new();
1993 batch.add_certificate(&certificate).unwrap();
1994 storage.write_batch(batch).await.unwrap();
1995
1996 let hash = certificate.hash();
1998 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1999 let store = storage.database.open_shared(&index_root_key).unwrap();
2000 let height_key = to_height_key(height);
2001 let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
2002
2003 assert!(value_bytes.is_some(), "Height index was not created");
2004 let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
2005 assert_eq!(stored_hash, hash, "Height index contains wrong hash");
2006 }
2007
2008 #[cfg(with_testing)]
2009 #[tokio::test]
2010 async fn test_read_certificates_by_heights() {
2011 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
2012 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
2013
2014 let mut batch = MultiPartitionBatch::new();
2016 let mut expected_certs = vec![];
2017
2018 for height in [1, 3, 5] {
2019 let block = Block {
2020 header: BlockHeader {
2021 chain_id,
2022 epoch: Epoch::ZERO,
2023 height: BlockHeight(height),
2024 timestamp: Timestamp::from(0),
2025 state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
2026 previous_block_hash: None,
2027 authenticated_owner: None,
2028 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
2029 messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
2030 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
2031 "pmb_hash_{height}",
2032 )),
2033 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
2034 "peb_hash_{height}",
2035 )),
2036 oracle_responses_hash: CryptoHash::new(&TestString::new(
2037 "oracle_hash_{height}",
2038 )),
2039 events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
2040 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
2041 operation_results_hash: CryptoHash::new(&TestString::new(
2042 "op_results_hash_{height}",
2043 )),
2044 },
2045 body: BlockBody {
2046 transactions: vec![],
2047 messages: vec![],
2048 previous_message_blocks: Default::default(),
2049 previous_event_blocks: Default::default(),
2050 oracle_responses: vec![],
2051 events: vec![],
2052 blobs: vec![],
2053 operation_results: vec![],
2054 },
2055 };
2056 let confirmed_block = ConfirmedBlock::new(block);
2057 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
2058 expected_certs.push((height, cert.clone()));
2059 batch.add_certificate(&cert).unwrap();
2060 }
2061 storage.write_batch(batch).await.unwrap();
2062
2063 let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
2065 let result = storage
2066 .read_certificates_by_heights(chain_id, &heights)
2067 .await
2068 .unwrap();
2069 assert_eq!(result.len(), 3);
2070 assert_eq!(
2071 result[0].as_ref().unwrap().hash(),
2072 expected_certs[0].1.hash()
2073 );
2074 assert_eq!(
2075 result[1].as_ref().unwrap().hash(),
2076 expected_certs[1].1.hash()
2077 );
2078 assert_eq!(
2079 result[2].as_ref().unwrap().hash(),
2080 expected_certs[2].1.hash()
2081 );
2082
2083 let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
2085 let result = storage
2086 .read_certificates_by_heights(chain_id, &heights)
2087 .await
2088 .unwrap();
2089 assert_eq!(result.len(), 3);
2090 assert_eq!(
2091 result[0].as_ref().unwrap().hash(),
2092 expected_certs[2].1.hash()
2093 );
2094 assert_eq!(
2095 result[1].as_ref().unwrap().hash(),
2096 expected_certs[0].1.hash()
2097 );
2098 assert_eq!(
2099 result[2].as_ref().unwrap().hash(),
2100 expected_certs[1].1.hash()
2101 );
2102
2103 let heights = vec![
2105 BlockHeight(1),
2106 BlockHeight(2),
2107 BlockHeight(3),
2108 BlockHeight(3),
2109 ];
2110 let result = storage
2111 .read_certificates_by_heights(chain_id, &heights)
2112 .await
2113 .unwrap();
2114 assert_eq!(result.len(), 4); assert!(result[0].is_some());
2116 assert!(result[1].is_none()); assert!(result[2].is_some());
2118 assert!(result[3].is_some());
2119 assert_eq!(
2120 result[2].as_ref().unwrap().hash(),
2121 result[3].as_ref().unwrap().hash()
2122 ); let heights = vec![];
2126 let result = storage
2127 .read_certificates_by_heights(chain_id, &heights)
2128 .await
2129 .unwrap();
2130 assert_eq!(result.len(), 0);
2131 }
2132
2133 #[cfg(with_testing)]
2134 #[tokio::test]
2135 async fn test_read_certificates_by_heights_multiple_chains() {
2136 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
2137
2138 let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
2140 let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
2141
2142 let mut batch = MultiPartitionBatch::new();
2143
2144 let block_a = Block {
2145 header: BlockHeader {
2146 chain_id: chain_a,
2147 epoch: Epoch::ZERO,
2148 height: BlockHeight(10),
2149 timestamp: Timestamp::from(0),
2150 state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
2151 previous_block_hash: None,
2152 authenticated_owner: None,
2153 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
2154 messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
2155 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
2156 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
2157 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
2158 events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
2159 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
2160 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
2161 },
2162 body: BlockBody {
2163 transactions: vec![],
2164 messages: vec![],
2165 previous_message_blocks: Default::default(),
2166 previous_event_blocks: Default::default(),
2167 oracle_responses: vec![],
2168 events: vec![],
2169 blobs: vec![],
2170 operation_results: vec![],
2171 },
2172 };
2173 let confirmed_block_a = ConfirmedBlock::new(block_a);
2174 let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
2175 batch.add_certificate(&cert_a).unwrap();
2176
2177 let block_b = Block {
2178 header: BlockHeader {
2179 chain_id: chain_b,
2180 epoch: Epoch::ZERO,
2181 height: BlockHeight(10),
2182 timestamp: Timestamp::from(0),
2183 state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
2184 previous_block_hash: None,
2185 authenticated_owner: None,
2186 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
2187 messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
2188 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
2189 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
2190 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
2191 events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
2192 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
2193 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
2194 },
2195 body: BlockBody {
2196 transactions: vec![],
2197 messages: vec![],
2198 previous_message_blocks: Default::default(),
2199 previous_event_blocks: Default::default(),
2200 oracle_responses: vec![],
2201 events: vec![],
2202 blobs: vec![],
2203 operation_results: vec![],
2204 },
2205 };
2206 let confirmed_block_b = ConfirmedBlock::new(block_b);
2207 let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
2208 batch.add_certificate(&cert_b).unwrap();
2209
2210 storage.write_batch(batch).await.unwrap();
2211
2212 let result = storage
2214 .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
2215 .await
2216 .unwrap();
2217 assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
2218
2219 let result = storage
2221 .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
2222 .await
2223 .unwrap();
2224 assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
2225
2226 let result = storage
2228 .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
2229 .await
2230 .unwrap();
2231 assert!(result[0].is_none());
2232 }
2233
2234 #[cfg(with_testing)]
2235 #[tokio::test]
2236 async fn test_read_certificates_by_heights_consistency() {
2237 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
2238 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
2239
2240 let mut batch = MultiPartitionBatch::new();
2242 let block = Block {
2243 header: BlockHeader {
2244 chain_id,
2245 epoch: Epoch::ZERO,
2246 height: BlockHeight(7),
2247 timestamp: Timestamp::from(0),
2248 state_hash: CryptoHash::new(&TestString::new("state_hash")),
2249 previous_block_hash: None,
2250 authenticated_owner: None,
2251 transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
2252 messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
2253 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
2254 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
2255 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
2256 events_hash: CryptoHash::new(&TestString::new("events_hash")),
2257 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
2258 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
2259 },
2260 body: BlockBody {
2261 transactions: vec![],
2262 messages: vec![],
2263 previous_message_blocks: Default::default(),
2264 previous_event_blocks: Default::default(),
2265 oracle_responses: vec![],
2266 events: vec![],
2267 blobs: vec![],
2268 operation_results: vec![],
2269 },
2270 };
2271 let confirmed_block = ConfirmedBlock::new(block);
2272 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
2273 let hash = cert.hash();
2274 batch.add_certificate(&cert).unwrap();
2275 storage.write_batch(batch).await.unwrap();
2276
2277 let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
2279
2280 let certs_by_height = storage
2282 .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
2283 .await
2284 .unwrap();
2285 let cert_by_height = certs_by_height[0].as_ref().unwrap();
2286
2287 assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
2289 assert_eq!(
2290 cert_by_hash.value().block().header,
2291 cert_by_height.value().block().header
2292 );
2293 }
2294}