linera_storage/
db_storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap},
6    fmt::Debug,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
16    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
17};
18use linera_cache::ValueCache;
19use linera_chain::{
20    types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
21    ChainStateView,
22};
23use linera_execution::{
24    BlobState, ExecutionRuntimeConfig, SharedCommittees, UserContractCode, UserServiceCode,
25    WasmRuntime,
26};
27use linera_views::{
28    backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
29    batch::Batch,
30    context::ViewContext,
31    store::{
32        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
33    },
34    views::View,
35    ViewError,
36};
37use serde::{Deserialize, Serialize};
38use tracing::instrument;
39#[cfg(with_testing)]
40use {
41    futures::channel::oneshot::{self, Receiver},
42    linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
43    std::cmp::Reverse,
44};
45
46use crate::{ChainRuntimeContext, Clock, Storage};
47
48#[cfg(with_metrics)]
49pub mod metrics {
50    use std::sync::LazyLock;
51
52    use linera_base::prometheus_util::{
53        exponential_bucket_interval, exponential_bucket_latencies, linear_bucket_interval,
54        register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
55    };
56    use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
57
58    /// Label name for distinguishing cache hits vs DB reads.
59    pub(super) const SOURCE_LABEL: &str = "source";
60    /// Label value for items served from the in-memory cache.
61    pub(super) const CACHE: &str = "cache";
62    /// Label value for items served from the database.
63    pub(super) const DB: &str = "db";
64
65    /// The metric counting how often a blob is tested for existence from storage
66    pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
67        register_int_counter_vec(
68            "contains_blob",
69            "The metric counting how often a blob is tested for existence from storage",
70            &[SOURCE_LABEL],
71        )
72    });
73
74    /// The metric counting how often multiple blobs are tested for existence from storage
75    pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
76        register_int_counter_vec(
77            "contains_blobs",
78            "The metric counting how often multiple blobs are tested for existence from storage",
79            &[SOURCE_LABEL],
80        )
81    });
82
83    /// The metric counting how often a blob state is tested for existence from storage
84    pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
85        register_int_counter_vec(
86            "contains_blob_state",
87            "The metric counting how often a blob state is tested for existence from storage",
88            &[SOURCE_LABEL],
89        )
90    });
91
92    /// The metric counting how often a certificate is tested for existence from storage.
93    pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
94        register_int_counter_vec(
95            "contains_certificate",
96            "The metric counting how often a certificate is tested for existence from storage",
97            &[SOURCE_LABEL],
98        )
99    });
100
101    /// The metric counting how often a hashed certificate value is read from storage.
102    #[doc(hidden)]
103    pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
104        register_int_counter_vec(
105            "read_confirmed_block",
106            "The metric counting how often a hashed confirmed block is read from storage",
107            &[SOURCE_LABEL],
108        )
109    });
110
111    /// The metric counting how often confirmed blocks are read from storage.
112    #[doc(hidden)]
113    pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
114        LazyLock::new(|| {
115            register_int_counter_vec(
116                "read_confirmed_blocks",
117                "The metric counting how often confirmed blocks are read from storage",
118                &[SOURCE_LABEL],
119            )
120        });
121
122    /// The metric counting how often a blob is read from storage.
123    #[doc(hidden)]
124    pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
125        register_int_counter_vec(
126            "read_blob",
127            "The metric counting how often a blob is read from storage",
128            &[SOURCE_LABEL],
129        )
130    });
131
132    /// The metric counting how often a blob state is read from storage.
133    #[doc(hidden)]
134    pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
135        register_int_counter_vec(
136            "read_blob_state",
137            "The metric counting how often a blob state is read from storage",
138            &[SOURCE_LABEL],
139        )
140    });
141
142    /// The metric counting how often a blob is written to storage.
143    #[doc(hidden)]
144    pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
145        register_int_counter(
146            "write_blob",
147            "The metric counting how often a blob is written to storage",
148        )
149    });
150
151    /// The metric counting how often a certificate is read from storage.
152    #[doc(hidden)]
153    pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
154        register_int_counter_vec(
155            "read_certificate",
156            "The metric counting how often a certificate is read from storage",
157            &[SOURCE_LABEL],
158        )
159    });
160
161    /// The metric counting how often certificates are read from storage.
162    #[doc(hidden)]
163    pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
164        register_int_counter_vec(
165            "read_certificates",
166            "The metric counting how often certificate are read from storage",
167            &[SOURCE_LABEL],
168        )
169    });
170
171    /// The metric counting how often a certificate is written to storage.
172    #[doc(hidden)]
173    pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
174        register_int_counter(
175            "write_certificate",
176            "The metric counting how often a certificate is written to storage",
177        )
178    });
179
180    /// Serialized size of the lite-certificate component (round + value hash + validator
181    /// signatures), observed when a confirmed certificate is written to storage. Bytes are
182    /// taken from the already-produced BCS output, so this adds no extra serialization work.
183    /// Sized to track the signature component, which is what grows under post-quantum
184    /// signature migration (10x for Falcon-512, 38x for ML-DSA-44).
185    pub(super) static CERTIFICATE_LITE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
186        register_histogram(
187            "certificate_lite_bytes",
188            "Serialized size of the lite-certificate (signatures + metadata) in bytes",
189            exponential_bucket_interval(128.0, 2_097_152.0),
190        )
191    });
192
193    /// Serialized size of the certificate value (block payload), observed when a confirmed
194    /// certificate is written to storage. Bytes are taken from the already-produced BCS
195    /// output. Range matches the gRPC max message size cap.
196    pub(super) static CERTIFICATE_VALUE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
197        register_histogram(
198            "certificate_value_bytes",
199            "Serialized size of the certificate value (block payload) in bytes",
200            exponential_bucket_interval(256.0, 16_777_216.0),
201        )
202    });
203
204    /// Number of validator signatures attached to each confirmed certificate. Linear buckets
205    /// because committee size is small (typically under 20) and resolution at single-signer
206    /// granularity matters more than range.
207    pub(super) static CERTIFICATE_SIGNER_COUNT: LazyLock<Histogram> = LazyLock::new(|| {
208        register_histogram(
209            "certificate_signer_count",
210            "Number of validator signatures attached to each confirmed certificate",
211            linear_bucket_interval(1.0, 1.0, 20.0),
212        )
213    });
214
215    /// The latency to load a chain state.
216    #[doc(hidden)]
217    pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
218        register_histogram_vec(
219            "load_chain_latency",
220            "The latency to load a chain state",
221            &[],
222            exponential_bucket_latencies(1000.0),
223        )
224    });
225
226    /// The metric counting how often an event is read from storage.
227    #[doc(hidden)]
228    pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
229        register_int_counter_vec(
230            "read_event",
231            "The metric counting how often an event is read from storage",
232            &[SOURCE_LABEL],
233        )
234    });
235
236    /// The metric counting how often an event is tested for existence from storage
237    pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
238        register_int_counter_vec(
239            "contains_event",
240            "The metric counting how often an event is tested for existence from storage",
241            &[SOURCE_LABEL],
242        )
243    });
244
245    /// The metric counting how often an event is written to storage.
246    #[doc(hidden)]
247    pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
248        register_int_counter(
249            "write_event",
250            "The metric counting how often an event is written to storage",
251        )
252    });
253
254    /// The metric counting how often the network description is read from storage.
255    #[doc(hidden)]
256    pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
257        register_int_counter_vec(
258            "network_description",
259            "The metric counting how often the network description is read from storage",
260            &[SOURCE_LABEL],
261        )
262    });
263
264    /// The metric counting how often the network description is written to storage.
265    #[doc(hidden)]
266    pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounter> = LazyLock::new(|| {
267        register_int_counter(
268            "write_network_description",
269            "The metric counting how often the network description is written to storage",
270        )
271    });
272}
273
274/// The key used for blobs. The Blob ID itself is contained in the root key.
275const BLOB_KEY: &[u8] = &[0];
276
277/// The key used for blob states. The Blob ID itself is contained in the root key.
278const BLOB_STATE_KEY: &[u8] = &[1];
279
280/// The key used for lite certificates. The cryptohash itself is contained in the root key.
281const LITE_CERTIFICATE_KEY: &[u8] = &[2];
282
283/// The key used for confirmed blocks. The cryptohash itself is contained in the root key.
284const BLOCK_KEY: &[u8] = &[3];
285
286/// The key used for the network description.
287const NETWORK_DESCRIPTION_KEY: &[u8] = &[4];
288
289fn get_block_keys() -> Vec<Vec<u8>> {
290    vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
291}
292
293#[derive(Default)]
294#[expect(clippy::type_complexity)]
295struct MultiPartitionBatch {
296    keys_value_bytes: BTreeMap<Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>>,
297}
298
299impl MultiPartitionBatch {
300    fn new() -> Self {
301        Self::default()
302    }
303
304    fn put_key_values(&mut self, root_key: Vec<u8>, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
305        let entry = self.keys_value_bytes.entry(root_key).or_default();
306        entry.extend(key_values);
307    }
308
309    fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
310        self.put_key_values(root_key, vec![(key, value)]);
311    }
312
313    fn add_blob(&mut self, blob: &Blob) {
314        #[cfg(with_metrics)]
315        metrics::WRITE_BLOB_COUNTER.inc();
316        let root_key = RootKey::BlobId(blob.id()).bytes();
317        let key = BLOB_KEY.to_vec();
318        self.put_key_value(root_key, key, blob.bytes().to_vec());
319    }
320
321    fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
322        let root_key = RootKey::BlobId(blob_id).bytes();
323        let key = BLOB_STATE_KEY.to_vec();
324        let value = bcs::to_bytes(blob_state)?;
325        self.put_key_value(root_key, key, value);
326        Ok(())
327    }
328
329    /// Adds a certificate to the batch.
330    ///
331    /// Writes both the certificate data (indexed by hash) and a height index
332    /// (mapping chain_id + height to hash).
333    ///
334    /// Note: If called multiple times with the same `(chain_id, height)`, the height
335    /// index will be overwritten. The caller is responsible for ensuring that
336    /// certificates at the same height have the same hash.
337    fn add_certificate(
338        &mut self,
339        certificate: &ConfirmedBlockCertificate,
340    ) -> Result<(), ViewError> {
341        #[cfg(with_metrics)]
342        {
343            metrics::WRITE_CERTIFICATE_COUNTER.inc();
344            metrics::CERTIFICATE_SIGNER_COUNT.observe(certificate.signatures().len() as f64);
345        }
346        let hash = certificate.hash();
347
348        // Write certificate data by hash
349        let root_key = RootKey::BlockHash(hash).bytes();
350        let mut key_values = Vec::new();
351        let key = LITE_CERTIFICATE_KEY.to_vec();
352        let value = bcs::to_bytes(&certificate.lite_certificate())?;
353        #[cfg(with_metrics)]
354        metrics::CERTIFICATE_LITE_BYTES.observe(value.len() as f64);
355        key_values.push((key, value));
356        let key = BLOCK_KEY.to_vec();
357        let value = bcs::to_bytes(&certificate.value())?;
358        #[cfg(with_metrics)]
359        metrics::CERTIFICATE_VALUE_BYTES.observe(value.len() as f64);
360        key_values.push((key, value));
361        self.put_key_values(root_key, key_values);
362
363        // Write height index: chain_id -> height -> hash
364        let chain_id = certificate.value().block().header.chain_id;
365        let height = certificate.value().block().header.height;
366        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
367        let height_key = to_height_key(height);
368        let index_value = bcs::to_bytes(&hash)?;
369        self.put_key_value(index_root_key, height_key, index_value);
370
371        // Write event block height index: chain_id -> (stream_id, index) -> height
372        let event_index_root_key = RootKey::EventBlockHeight(chain_id).bytes();
373        let height_value = bcs::to_bytes(&height)?;
374        for event in certificate.value().block().body.events.iter().flatten() {
375            let event_key = to_event_key(&EventId {
376                chain_id,
377                stream_id: event.stream_id.clone(),
378                index: event.index,
379            });
380            self.put_key_value(
381                event_index_root_key.clone(),
382                event_key,
383                height_value.clone(),
384            );
385        }
386
387        Ok(())
388    }
389
390    fn add_event(&mut self, event_id: &EventId, value: Vec<u8>) {
391        #[cfg(with_metrics)]
392        metrics::WRITE_EVENT_COUNTER.inc();
393        let key = to_event_key(event_id);
394        let root_key = RootKey::Event(event_id.chain_id).bytes();
395        self.put_key_value(root_key, key, value);
396    }
397
398    fn add_network_description(
399        &mut self,
400        information: &NetworkDescription,
401    ) -> Result<(), ViewError> {
402        #[cfg(with_metrics)]
403        metrics::WRITE_NETWORK_DESCRIPTION.inc();
404        let root_key = RootKey::NetworkDescription.bytes();
405        let key = NETWORK_DESCRIPTION_KEY.to_vec();
406        let value = bcs::to_bytes(information)?;
407        self.put_key_value(root_key, key, value);
408        Ok(())
409    }
410}
411
412/// Individual cache sizes for each `ValueCache` in `DbStorage`.
413#[derive(Clone, Copy, Debug)]
414pub struct StorageCacheConfig {
415    pub blob_cache_size: usize,
416    pub confirmed_block_cache_size: usize,
417    pub certificate_cache_size: usize,
418    pub certificate_raw_cache_size: usize,
419    pub event_cache_size: usize,
420    pub cache_cleanup_interval_secs: u64,
421}
422
423/// Default cache configuration for testing.
424#[cfg(with_testing)]
425pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
426    blob_cache_size: 1000,
427    confirmed_block_cache_size: 1000,
428    certificate_cache_size: 1000,
429    certificate_raw_cache_size: 1000,
430    event_cache_size: 1000,
431    cache_cleanup_interval_secs: linera_cache::DEFAULT_CLEANUP_INTERVAL_SECS,
432};
433
434/// Raw certificate bytes: (lite_certificate_bytes, confirmed_block_bytes).
435type RawCertificate = (Vec<u8>, Vec<u8>);
436
437/// Groups all `ValueCache` instances used by `DbStorage`.
438///
439/// All caches use `ValueCache` which stores values as `Arc<V>` internally,
440/// ensuring memory-efficient sharing across consumers. Adding a new cache
441/// here automatically inherits Arc-based sharing.
442#[derive(Clone)]
443pub struct StorageCaches {
444    pub(crate) blob: Arc<ValueCache<BlobId, Blob>>,
445    pub(crate) confirmed_block: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
446    pub(crate) certificate: Arc<ValueCache<CryptoHash, ConfirmedBlockCertificate>>,
447    pub(crate) certificate_raw: Arc<ValueCache<CryptoHash, RawCertificate>>,
448    pub(crate) event: Arc<ValueCache<EventId, Vec<u8>>>,
449}
450
451impl StorageCaches {
452    /// Creates all caches with the given sizes.
453    pub fn new(sizes: StorageCacheConfig) -> Self {
454        let interval = sizes.cache_cleanup_interval_secs;
455        Self {
456            blob: Arc::new(ValueCache::new(sizes.blob_cache_size, interval)),
457            confirmed_block: Arc::new(ValueCache::new(sizes.confirmed_block_cache_size, interval)),
458            certificate: Arc::new(ValueCache::new(sizes.certificate_cache_size, interval)),
459            certificate_raw: Arc::new(ValueCache::new(sizes.certificate_raw_cache_size, interval)),
460            event: Arc::new(ValueCache::new(sizes.event_cache_size, interval)),
461        }
462    }
463}
464
465/// Main implementation of the [`Storage`] trait.
466#[derive(Clone)]
467pub struct DbStorage<Database, Clock = WallClock> {
468    database: Arc<Database>,
469    clock: Clock,
470    thread_pool: Arc<linera_execution::ThreadPool>,
471    wasm_runtime: Option<WasmRuntime>,
472    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
473    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
474    shared_committees: SharedCommittees,
475    caches: StorageCaches,
476    execution_runtime_config: ExecutionRuntimeConfig,
477}
478
479#[derive(Debug, Serialize, Deserialize)]
480enum RootKey {
481    NetworkDescription,
482    BlockExporterState(u32),
483    ChainState(ChainId),
484    BlockHash(CryptoHash),
485    BlobId(BlobId),
486    Event(ChainId),
487    BlockByHeight(ChainId),
488    EventBlockHeight(ChainId),
489}
490
491const CHAIN_ID_TAG: u8 = 2;
492const BLOB_ID_TAG: u8 = 4;
493const EVENT_ID_TAG: u8 = 5;
494
495impl RootKey {
496    fn bytes(&self) -> Vec<u8> {
497        bcs::to_bytes(self).unwrap()
498    }
499}
500
501#[derive(Debug, Serialize, Deserialize)]
502struct RestrictedEventId {
503    pub stream_id: StreamId,
504    pub index: u32,
505}
506
507fn to_event_key(event_id: &EventId) -> Vec<u8> {
508    let restricted_event_id = RestrictedEventId {
509        stream_id: event_id.stream_id.clone(),
510        index: event_id.index,
511    };
512    bcs::to_bytes(&restricted_event_id).unwrap()
513}
514
515pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
516    bcs::to_bytes(&height).unwrap()
517}
518
519fn is_chain_state(root_key: &[u8]) -> bool {
520    if root_key.is_empty() {
521        return false;
522    }
523    root_key[0] == CHAIN_ID_TAG
524}
525
526/// An implementation of [`DualStoreRootKeyAssignment`] that stores the
527/// chain states into the first store.
528#[derive(Clone, Copy)]
529pub struct ChainStatesFirstAssignment;
530
531impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
532    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
533        if root_key.is_empty() {
534            return Ok(StoreInUse::Second);
535        }
536        let store = match is_chain_state(root_key) {
537            true => StoreInUse::First,
538            false => StoreInUse::Second,
539        };
540        Ok(store)
541    }
542}
543
544/// A `Clock` implementation using the system clock.
545#[derive(Clone)]
546pub struct WallClock;
547
548#[cfg_attr(not(web), async_trait)]
549#[cfg_attr(web, async_trait(?Send))]
550impl Clock for WallClock {
551    fn current_time(&self) -> Timestamp {
552        Timestamp::now()
553    }
554
555    async fn sleep_until(&self, timestamp: Timestamp) {
556        let delta = timestamp.delta_since(Timestamp::now());
557        if delta > TimeDelta::ZERO {
558            linera_base::time::timer::sleep(delta.as_duration()).await
559        }
560    }
561}
562
563#[cfg(with_testing)]
564#[derive(Default)]
565struct TestClockInner {
566    time: Timestamp,
567    sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
568    /// Optional callback that decides whether to auto-advance for a given target timestamp.
569    /// Returns `true` if the clock should auto-advance to that time.
570    sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
571}
572
573#[cfg(with_testing)]
574impl TestClockInner {
575    fn set(&mut self, time: Timestamp) {
576        self.time = time;
577        let senders = self.sleeps.split_off(&Reverse(time));
578        for sender in senders.into_values().flatten() {
579            // Receiver may have been dropped if the sleep was cancelled.
580            sender.send(()).ok();
581        }
582    }
583
584    fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
585        let (sender, receiver) = oneshot::channel();
586        let should_auto_advance = self
587            .sleep_callback
588            .as_ref()
589            .is_some_and(|callback| callback(time));
590        if should_auto_advance && time > self.time {
591            // Auto-advance mode: immediately advance the clock and complete the sleep.
592            self.set(time);
593            // Receiver may have been dropped if the sleep was cancelled.
594            sender.send(()).ok();
595        } else if self.time >= time {
596            // Receiver may have been dropped if the sleep was cancelled.
597            sender.send(()).ok();
598        } else {
599            self.sleeps.entry(Reverse(time)).or_default().push(sender);
600        }
601        receiver
602    }
603}
604
605/// A clock implementation that uses a stored number of microseconds and that can be updated
606/// explicitly. All clones share the same time, and setting it in one clone updates all the others.
607#[cfg(with_testing)]
608#[derive(Clone, Default)]
609pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
610
611#[cfg(with_testing)]
612#[cfg_attr(not(web), async_trait)]
613#[cfg_attr(web, async_trait(?Send))]
614impl Clock for TestClock {
615    fn current_time(&self) -> Timestamp {
616        self.lock().time
617    }
618
619    async fn sleep_until(&self, timestamp: Timestamp) {
620        let receiver = self.lock().add_sleep_until(timestamp);
621        // Sender may have been dropped if the clock was dropped; just stop waiting.
622        receiver.await.ok();
623    }
624}
625
626#[cfg(with_testing)]
627impl TestClock {
628    /// Creates a new clock with its time set to 0, i.e. the Unix epoch.
629    pub fn new() -> Self {
630        TestClock(Arc::default())
631    }
632
633    /// Sets the current time.
634    pub fn set(&self, time: Timestamp) {
635        self.lock().set(time);
636    }
637
638    /// Advances the current time by the specified delta.
639    pub fn add(&self, delta: TimeDelta) {
640        let mut guard = self.lock();
641        let time = guard.time.saturating_add(delta);
642        guard.set(time);
643    }
644
645    /// Returns the current time according to the test clock.
646    pub fn current_time(&self) -> Timestamp {
647        self.lock().time
648    }
649
650    /// Sets a callback that decides whether to auto-advance for each sleep call.
651    ///
652    /// The callback receives the target timestamp and should return `true` if the clock
653    /// should auto-advance to that time, or `false` if the sleep should block normally.
654    #[cfg(with_testing)]
655    pub fn set_sleep_callback<F>(&self, callback: F)
656    where
657        F: Fn(Timestamp) -> bool + Send + Sync + 'static,
658    {
659        self.lock().sleep_callback = Some(Box::new(callback));
660    }
661
662    fn lock(&self) -> std::sync::MutexGuard<'_, TestClockInner> {
663        self.0.lock().expect("poisoned TestClock mutex")
664    }
665}
666
667#[cfg_attr(not(web), async_trait)]
668#[cfg_attr(web, async_trait(?Send))]
669impl<Database, C> Storage for DbStorage<Database, C>
670where
671    Database: KeyValueDatabase<
672            Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
673            Error: Send + Sync,
674        > + Clone
675        + linera_base::util::traits::AutoTraits
676        + 'static,
677    C: Clock + Clone + Send + Sync + 'static,
678{
679    type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
680    type Clock = C;
681    type BlockExporterContext = ViewContext<u32, Database::Store>;
682
683    fn clock(&self) -> &C {
684        &self.clock
685    }
686
687    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
688        &self.thread_pool
689    }
690
691    fn shared_committees(&self) -> &SharedCommittees {
692        &self.shared_committees
693    }
694
695    #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
696    async fn load_chain(
697        &self,
698        chain_id: ChainId,
699    ) -> Result<ChainStateView<Self::Context>, ViewError> {
700        #[cfg(with_metrics)]
701        let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
702        let runtime_context = ChainRuntimeContext {
703            storage: self.clone(),
704            thread_pool: self.thread_pool.clone(),
705            chain_id,
706            execution_runtime_config: self.execution_runtime_config,
707            user_contracts: self.user_contracts.clone(),
708            user_services: self.user_services.clone(),
709        };
710        let root_key = RootKey::ChainState(chain_id).bytes();
711        let store = self.database.open_exclusive(&root_key)?;
712        let context = ViewContext::create_root_context(store, runtime_context).await?;
713        ChainStateView::load(context).await
714    }
715
716    #[instrument(level = "trace", skip_all, fields(%blob_id))]
717    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
718        if self.caches.blob.contains(&blob_id) {
719            #[cfg(with_metrics)]
720            metrics::CONTAINS_BLOB_COUNTER
721                .with_label_values(&[metrics::CACHE])
722                .inc();
723            return Ok(true);
724        }
725        let root_key = RootKey::BlobId(blob_id).bytes();
726        let store = self.database.open_shared(&root_key)?;
727        let test = store.contains_key(BLOB_KEY).await?;
728        #[cfg(with_metrics)]
729        metrics::CONTAINS_BLOB_COUNTER
730            .with_label_values(&[metrics::DB])
731            .inc();
732        Ok(test)
733    }
734
735    #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
736    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
737        let mut missing_blobs = Vec::new();
738        #[cfg(with_metrics)]
739        let mut cache_hits: u64 = 0;
740        #[cfg(with_metrics)]
741        let mut db_checks: u64 = 0;
742        for blob_id in blob_ids {
743            if self.caches.blob.contains(blob_id) {
744                #[cfg(with_metrics)]
745                {
746                    cache_hits += 1;
747                }
748                continue;
749            }
750            #[cfg(with_metrics)]
751            {
752                db_checks += 1;
753            }
754            let root_key = RootKey::BlobId(*blob_id).bytes();
755            let store = self.database.open_shared(&root_key)?;
756            if !store.contains_key(BLOB_KEY).await? {
757                missing_blobs.push(*blob_id);
758            }
759        }
760        #[cfg(with_metrics)]
761        {
762            if cache_hits > 0 {
763                metrics::CONTAINS_BLOBS_COUNTER
764                    .with_label_values(&[metrics::CACHE])
765                    .inc_by(cache_hits);
766            }
767            if db_checks > 0 {
768                metrics::CONTAINS_BLOBS_COUNTER
769                    .with_label_values(&[metrics::DB])
770                    .inc_by(db_checks);
771            }
772        }
773        Ok(missing_blobs)
774    }
775
776    #[instrument(skip_all, fields(%blob_id))]
777    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
778        let root_key = RootKey::BlobId(blob_id).bytes();
779        let store = self.database.open_shared(&root_key)?;
780        let test = store.contains_key(BLOB_STATE_KEY).await?;
781        #[cfg(with_metrics)]
782        metrics::CONTAINS_BLOB_STATE_COUNTER
783            .with_label_values(&[metrics::DB])
784            .inc();
785        Ok(test)
786    }
787
788    #[instrument(skip_all, fields(%hash))]
789    async fn read_confirmed_block(
790        &self,
791        hash: CryptoHash,
792    ) -> Result<Option<Arc<ConfirmedBlock>>, ViewError> {
793        if let Some(block) = self.caches.confirmed_block.get(&hash) {
794            #[cfg(with_metrics)]
795            metrics::READ_CONFIRMED_BLOCK_COUNTER
796                .with_label_values(&[metrics::CACHE])
797                .inc();
798            return Ok(Some(block));
799        }
800        let root_key = RootKey::BlockHash(hash).bytes();
801        let store = self.database.open_shared(&root_key)?;
802        let value = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await?;
803        #[cfg(with_metrics)]
804        metrics::READ_CONFIRMED_BLOCK_COUNTER
805            .with_label_values(&[metrics::DB])
806            .inc();
807        match value {
808            Some(block) => Ok(Some(self.caches.confirmed_block.insert(&hash, block))),
809            None => Ok(None),
810        }
811    }
812
813    #[instrument(skip_all)]
814    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
815        &self,
816        hashes: I,
817    ) -> Result<Vec<Option<Arc<ConfirmedBlock>>>, ViewError> {
818        let hashes = hashes.into_iter().collect::<Vec<_>>();
819        if hashes.is_empty() {
820            return Ok(Vec::new());
821        }
822        let mut results = vec![None; hashes.len()];
823        let mut misses = Vec::new();
824        for (i, hash) in hashes.iter().enumerate() {
825            if let Some(block) = self.caches.confirmed_block.get(hash) {
826                results[i] = Some(block);
827            } else {
828                misses.push(i);
829            }
830        }
831        if !misses.is_empty() {
832            let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
833            let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
834            for (miss_idx, root_key) in misses.iter().zip(root_keys) {
835                let store = self.database.open_shared(&root_key)?;
836                if let Some(block) = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await? {
837                    results[*miss_idx] = Some(
838                        self.caches
839                            .confirmed_block
840                            .insert(&hashes[*miss_idx], block),
841                    );
842                }
843            }
844        }
845        #[cfg(with_metrics)]
846        {
847            let cache_hits = (hashes.len() - misses.len()) as u64;
848            if cache_hits > 0 {
849                metrics::READ_CONFIRMED_BLOCKS_COUNTER
850                    .with_label_values(&[metrics::CACHE])
851                    .inc_by(cache_hits);
852            }
853            let db_reads = misses.len() as u64;
854            if db_reads > 0 {
855                metrics::READ_CONFIRMED_BLOCKS_COUNTER
856                    .with_label_values(&[metrics::DB])
857                    .inc_by(db_reads);
858            }
859        }
860        Ok(results)
861    }
862
863    #[instrument(skip_all, fields(%blob_id))]
864    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError> {
865        if let Some(blob) = self.caches.blob.get(&blob_id) {
866            #[cfg(with_metrics)]
867            metrics::READ_BLOB_COUNTER
868                .with_label_values(&[metrics::CACHE])
869                .inc();
870            return Ok(Some(blob));
871        }
872        let root_key = RootKey::BlobId(blob_id).bytes();
873        let store = self.database.open_shared(&root_key)?;
874        let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
875        #[cfg(with_metrics)]
876        metrics::READ_BLOB_COUNTER
877            .with_label_values(&[metrics::DB])
878            .inc();
879        match maybe_blob_bytes {
880            Some(blob_bytes) => {
881                let blob = Blob::new_with_id_unchecked(blob_id, blob_bytes);
882                Ok(Some(self.caches.blob.insert(&blob_id, blob)))
883            }
884            None => Ok(None),
885        }
886    }
887
888    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
889    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Arc<Blob>>>, ViewError> {
890        if blob_ids.is_empty() {
891            return Ok(Vec::new());
892        }
893        // Each blob lives under its own root_key (partition), so cross-partition
894        // reads can't be coalesced into a single IN query. The ScyllaDB best
895        // practice is parallel queries via the shard-aware driver, which routes
896        // each query to the right shard on the right node. RocksDB benefits too:
897        // concurrent point lookups let the scheduler overlap cache/SST reads.
898        futures::future::try_join_all(blob_ids.iter().map(|blob_id| self.read_blob(*blob_id))).await
899    }
900
901    #[instrument(skip_all, fields(%blob_id))]
902    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
903        let root_key = RootKey::BlobId(blob_id).bytes();
904        let store = self.database.open_shared(&root_key)?;
905        let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
906        #[cfg(with_metrics)]
907        metrics::READ_BLOB_STATE_COUNTER
908            .with_label_values(&[metrics::DB])
909            .inc();
910        Ok(blob_state)
911    }
912
913    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
914    async fn read_blob_states(
915        &self,
916        blob_ids: &[BlobId],
917    ) -> Result<Vec<Option<BlobState>>, ViewError> {
918        if blob_ids.is_empty() {
919            return Ok(Vec::new());
920        }
921        futures::future::try_join_all(
922            blob_ids
923                .iter()
924                .map(|blob_id| self.read_blob_state(*blob_id)),
925        )
926        .await
927    }
928
929    #[instrument(skip_all, fields(blob_id = %blob.id()))]
930    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
931        let mut batch = MultiPartitionBatch::new();
932        batch.add_blob(blob);
933        self.write_batch(batch).await?;
934        Ok(())
935    }
936
937    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
938    async fn maybe_write_blob_states(
939        &self,
940        blob_ids: &[BlobId],
941        blob_state: BlobState,
942    ) -> Result<(), ViewError> {
943        if blob_ids.is_empty() {
944            return Ok(());
945        }
946        let mut maybe_blob_states = Vec::new();
947        for blob_id in blob_ids {
948            let root_key = RootKey::BlobId(*blob_id).bytes();
949            let store = self.database.open_shared(&root_key)?;
950            let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
951            maybe_blob_states.push(maybe_blob_state);
952        }
953        let mut batch = MultiPartitionBatch::new();
954        for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
955            match maybe_blob_state {
956                None => {
957                    batch.add_blob_state(*blob_id, &blob_state)?;
958                }
959                Some(state) => {
960                    if state.epoch < blob_state.epoch {
961                        batch.add_blob_state(*blob_id, &blob_state)?;
962                    }
963                }
964            }
965        }
966        // We tolerate race conditions because two active chains are likely to
967        // be both from the latest epoch, and otherwise failing to pick the
968        // more recent blob state has limited impact.
969        self.write_batch(batch).await?;
970        Ok(())
971    }
972
973    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
974    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
975        if blobs.is_empty() {
976            return Ok(Vec::new());
977        }
978        let mut batch = MultiPartitionBatch::new();
979        let mut blob_states = Vec::new();
980        for blob in blobs {
981            let root_key = RootKey::BlobId(blob.id()).bytes();
982            let store = self.database.open_shared(&root_key)?;
983            let has_state = store.contains_key(BLOB_STATE_KEY).await?;
984            blob_states.push(has_state);
985            if has_state {
986                batch.add_blob(blob);
987            }
988        }
989        self.write_batch(batch).await?;
990        Ok(blob_states)
991    }
992
993    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
994    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
995        if blobs.is_empty() {
996            return Ok(());
997        }
998        let mut batch = MultiPartitionBatch::new();
999        for blob in blobs {
1000            batch.add_blob(blob);
1001        }
1002        self.write_batch(batch).await
1003    }
1004
1005    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1006    async fn write_blobs_and_certificate(
1007        &self,
1008        blobs: &[Blob],
1009        certificate: &ConfirmedBlockCertificate,
1010    ) -> Result<(), ViewError> {
1011        let mut batch = MultiPartitionBatch::new();
1012        for blob in blobs {
1013            batch.add_blob(blob);
1014        }
1015        batch.add_certificate(certificate)?;
1016        self.write_batch(batch).await
1017    }
1018
1019    fn cache_certificate(
1020        &self,
1021        certificate: ConfirmedBlockCertificate,
1022    ) -> Arc<ConfirmedBlockCertificate> {
1023        self.caches
1024            .certificate
1025            .insert(&certificate.hash(), certificate)
1026    }
1027
1028    fn cache_blob(&self, blob: Blob) -> Arc<Blob> {
1029        self.caches.blob.insert(&blob.id(), blob)
1030    }
1031
1032    fn cache_confirmed_block(&self, block: ConfirmedBlock) -> Arc<ConfirmedBlock> {
1033        self.caches.confirmed_block.insert(&block.hash(), block)
1034    }
1035
1036    #[instrument(skip_all, fields(%hash))]
1037    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
1038        if self.caches.certificate.contains(&hash) || self.caches.certificate_raw.contains(&hash) {
1039            #[cfg(with_metrics)]
1040            metrics::CONTAINS_CERTIFICATE_COUNTER
1041                .with_label_values(&[metrics::CACHE])
1042                .inc();
1043            return Ok(true);
1044        }
1045        let root_key = RootKey::BlockHash(hash).bytes();
1046        let store = self.database.open_shared(&root_key)?;
1047        let results = store.contains_keys(&get_block_keys()).await?;
1048        #[cfg(with_metrics)]
1049        metrics::CONTAINS_CERTIFICATE_COUNTER
1050            .with_label_values(&[metrics::DB])
1051            .inc();
1052        Ok(results[0] && results[1])
1053    }
1054
1055    #[instrument(skip_all, fields(%hash))]
1056    async fn read_certificate(
1057        &self,
1058        hash: CryptoHash,
1059    ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError> {
1060        // Assembled certificate cache (single Arc, no re-assembly)
1061        if let Some(cert) = self.caches.certificate.get(&hash) {
1062            #[cfg(with_metrics)]
1063            metrics::READ_CERTIFICATE_COUNTER
1064                .with_label_values(&[metrics::CACHE])
1065                .inc();
1066            return Ok(Some(cert));
1067        }
1068        // Raw bytes cache — deserialize + populate caches
1069        if let Some(raw) = self.caches.certificate_raw.get(&hash) {
1070            #[cfg(with_metrics)]
1071            metrics::READ_CERTIFICATE_COUNTER
1072                .with_label_values(&[metrics::CACHE])
1073                .inc();
1074            return self.deserialize_and_cache_certificate(&raw.0, &raw.1);
1075        }
1076        // DB
1077        let root_key = RootKey::BlockHash(hash).bytes();
1078        let store = self.database.open_shared(&root_key)?;
1079        let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1080        #[cfg(with_metrics)]
1081        metrics::READ_CERTIFICATE_COUNTER
1082            .with_label_values(&[metrics::DB])
1083            .inc();
1084        let Some(lite_cert_bytes) = values[0].as_ref() else {
1085            return Ok(None);
1086        };
1087        let Some(confirmed_block_bytes) = values[1].as_ref() else {
1088            return Ok(None);
1089        };
1090        self.caches.certificate_raw.insert(
1091            &hash,
1092            (lite_cert_bytes.clone(), confirmed_block_bytes.clone()),
1093        );
1094        self.deserialize_and_cache_certificate(lite_cert_bytes, confirmed_block_bytes)
1095    }
1096
1097    #[instrument(skip_all)]
1098    async fn read_certificates(
1099        &self,
1100        hashes: &[CryptoHash],
1101    ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError> {
1102        let raw_certs = self.read_certificates_raw(hashes).await?;
1103
1104        raw_certs
1105            .into_iter()
1106            .map(|maybe_raw| {
1107                let Some(raw) = maybe_raw else {
1108                    return Ok(None);
1109                };
1110                self.deserialize_and_cache_certificate(&raw.0, &raw.1)
1111            })
1112            .collect()
1113    }
1114
1115    #[instrument(skip_all)]
1116    async fn read_certificates_raw(
1117        &self,
1118        hashes: &[CryptoHash],
1119    ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1120        if hashes.is_empty() {
1121            return Ok(Vec::new());
1122        }
1123        let mut results = vec![None; hashes.len()];
1124        let mut misses = Vec::new();
1125        for (i, hash) in hashes.iter().enumerate() {
1126            if let Some(raw) = self.caches.certificate_raw.get(hash) {
1127                results[i] = Some(raw);
1128            } else {
1129                misses.push(i);
1130            }
1131        }
1132        if !misses.is_empty() {
1133            let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
1134            let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
1135            for (miss_idx, root_key) in misses.iter().zip(root_keys) {
1136                let store = self.database.open_shared(&root_key)?;
1137                let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1138                if let (Some(lite), Some(block)) = (values[0].as_ref(), values[1].as_ref()) {
1139                    results[*miss_idx] = Some(
1140                        self.caches
1141                            .certificate_raw
1142                            .insert(&hashes[*miss_idx], (lite.clone(), block.clone())),
1143                    );
1144                }
1145            }
1146        }
1147        #[cfg(with_metrics)]
1148        {
1149            let cache_hits = (hashes.len() - misses.len()) as u64;
1150            if cache_hits > 0 {
1151                metrics::READ_CERTIFICATES_COUNTER
1152                    .with_label_values(&[metrics::CACHE])
1153                    .inc_by(cache_hits);
1154            }
1155            let db_reads = misses.len() as u64;
1156            if db_reads > 0 {
1157                metrics::READ_CERTIFICATES_COUNTER
1158                    .with_label_values(&[metrics::DB])
1159                    .inc_by(db_reads);
1160            }
1161        }
1162        Ok(results)
1163    }
1164
1165    async fn read_certificate_hashes_by_heights(
1166        &self,
1167        chain_id: ChainId,
1168        heights: &[BlockHeight],
1169    ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
1170        if heights.is_empty() {
1171            return Ok(Vec::new());
1172        }
1173
1174        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1175        let store = self.database.open_shared(&index_root_key)?;
1176        let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
1177        let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
1178        let hash_options: Vec<Option<CryptoHash>> = hash_bytes
1179            .into_iter()
1180            .map(|opt| {
1181                opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
1182                    .transpose()
1183            })
1184            .collect::<Result<_, _>>()?;
1185
1186        Ok(hash_options)
1187    }
1188
1189    async fn read_event_block_heights(
1190        &self,
1191        event_ids: &[EventId],
1192    ) -> Result<Vec<Option<BlockHeight>>, ViewError> {
1193        if event_ids.is_empty() {
1194            return Ok(Vec::new());
1195        }
1196        // Group event IDs by chain ID for batch lookups per partition.
1197        let mut chain_groups = BTreeMap::<_, Vec<_>>::new();
1198        for (i, event_id) in event_ids.iter().enumerate() {
1199            chain_groups
1200                .entry(event_id.chain_id)
1201                .or_default()
1202                .push((i, to_event_key(event_id)));
1203        }
1204        let mut results = vec![None; event_ids.len()];
1205        for (chain_id, entries) in chain_groups {
1206            let root_key = RootKey::EventBlockHeight(chain_id).bytes();
1207            let store = self.database.open_shared(&root_key)?;
1208            let keys = entries
1209                .iter()
1210                .map(|(_, key)| key.clone())
1211                .collect::<Vec<_>>();
1212            let values = store.read_multi_values_bytes(&keys).await?;
1213            for ((original_index, _), value) in entries.into_iter().zip(values) {
1214                if let Some(bytes) = value {
1215                    results[original_index] = Some(bcs::from_bytes::<BlockHeight>(&bytes)?);
1216                }
1217            }
1218        }
1219        Ok(results)
1220    }
1221
1222    #[instrument(skip_all)]
1223    async fn read_certificates_by_heights_raw(
1224        &self,
1225        chain_id: ChainId,
1226        heights: &[BlockHeight],
1227    ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1228        let hashes: Vec<Option<CryptoHash>> = self
1229            .read_certificate_hashes_by_heights(chain_id, heights)
1230            .await?;
1231
1232        // Map from hash to all indices in the heights array (handles duplicates)
1233        let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
1234        for (index, maybe_hash) in hashes.iter().enumerate() {
1235            if let Some(hash) = maybe_hash {
1236                indices.entry(*hash).or_default().push(index);
1237            }
1238        }
1239
1240        // Deduplicate hashes for the storage query
1241        let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
1242
1243        let mut result = vec![None; heights.len()];
1244
1245        for (raw_cert, hash) in self
1246            .read_certificates_raw(&unique_hashes)
1247            .await?
1248            .into_iter()
1249            .zip(unique_hashes)
1250        {
1251            if let Some(idx_list) = indices.get(&hash) {
1252                for &index in idx_list {
1253                    result[index] = raw_cert.clone();
1254                }
1255            } else {
1256                // This should not happen, but log a warning if it does.
1257                tracing::error!(?hash, "certificate hash not found in indices map",);
1258            }
1259        }
1260
1261        Ok(result)
1262    }
1263
1264    #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
1265    async fn read_certificates_by_heights(
1266        &self,
1267        chain_id: ChainId,
1268        heights: &[BlockHeight],
1269    ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError> {
1270        self.read_certificates_by_heights_raw(chain_id, heights)
1271            .await?
1272            .into_iter()
1273            .map(|maybe_raw| match maybe_raw {
1274                None => Ok(None),
1275                Some(raw) => self.deserialize_and_cache_certificate(&raw.0, &raw.1),
1276            })
1277            .collect()
1278    }
1279
1280    #[instrument(skip_all, fields(event_id = ?event_id))]
1281    async fn read_event(&self, event_id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError> {
1282        if let Some(event) = self.caches.event.get(&event_id) {
1283            #[cfg(with_metrics)]
1284            metrics::READ_EVENT_COUNTER
1285                .with_label_values(&[metrics::CACHE])
1286                .inc();
1287            return Ok(Some(event));
1288        }
1289        let event_key = to_event_key(&event_id);
1290        let root_key = RootKey::Event(event_id.chain_id).bytes();
1291        let store = self.database.open_shared(&root_key)?;
1292        let event = store.read_value_bytes(&event_key).await?;
1293        #[cfg(with_metrics)]
1294        metrics::READ_EVENT_COUNTER
1295            .with_label_values(&[metrics::DB])
1296            .inc();
1297        match event {
1298            Some(event_bytes) => Ok(Some(self.caches.event.insert(&event_id, event_bytes))),
1299            None => Ok(None),
1300        }
1301    }
1302
1303    #[instrument(skip_all, fields(event_id = ?event_id))]
1304    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1305        if self.caches.event.contains(&event_id) {
1306            #[cfg(with_metrics)]
1307            metrics::CONTAINS_EVENT_COUNTER
1308                .with_label_values(&[metrics::CACHE])
1309                .inc();
1310            return Ok(true);
1311        }
1312        let event_key = to_event_key(&event_id);
1313        let root_key = RootKey::Event(event_id.chain_id).bytes();
1314        let store = self.database.open_shared(&root_key)?;
1315        let exists = store.contains_key(&event_key).await?;
1316        #[cfg(with_metrics)]
1317        metrics::CONTAINS_EVENT_COUNTER
1318            .with_label_values(&[metrics::DB])
1319            .inc();
1320        Ok(exists)
1321    }
1322
1323    #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
1324    async fn read_events_from_index(
1325        &self,
1326        chain_id: &ChainId,
1327        stream_id: &StreamId,
1328        start_index: u32,
1329    ) -> Result<Vec<IndexAndEvent>, ViewError> {
1330        let root_key = RootKey::Event(*chain_id).bytes();
1331        let store = self.database.open_shared(&root_key)?;
1332        let mut keys = Vec::new();
1333        let mut indices = Vec::new();
1334        let prefix = bcs::to_bytes(stream_id).unwrap();
1335        for short_key in store.find_keys_by_prefix(&prefix).await? {
1336            let index = bcs::from_bytes::<u32>(&short_key)?;
1337            if index >= start_index {
1338                let mut key = prefix.clone();
1339                key.extend(short_key);
1340                keys.push(key);
1341                indices.push(index);
1342            }
1343        }
1344        let values = store.read_multi_values_bytes(&keys).await?;
1345        let mut returned_values = Vec::new();
1346        for (index, value) in indices.into_iter().zip(values) {
1347            let event = value.unwrap();
1348            returned_values.push(IndexAndEvent { index, event });
1349        }
1350        Ok(returned_values)
1351    }
1352
1353    #[instrument(skip_all)]
1354    async fn write_events(
1355        &self,
1356        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1357    ) -> Result<(), ViewError> {
1358        let mut batch = MultiPartitionBatch::new();
1359        for (event_id, value) in events {
1360            batch.add_event(&event_id, value);
1361        }
1362        self.write_batch(batch).await
1363    }
1364
1365    #[instrument(skip_all)]
1366    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1367        let root_key = RootKey::NetworkDescription.bytes();
1368        let store = self.database.open_shared(&root_key)?;
1369        let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1370        #[cfg(with_metrics)]
1371        metrics::READ_NETWORK_DESCRIPTION
1372            .with_label_values(&[metrics::DB])
1373            .inc();
1374        Ok(maybe_value)
1375    }
1376
1377    #[instrument(skip_all)]
1378    async fn write_network_description(
1379        &self,
1380        information: &NetworkDescription,
1381    ) -> Result<(), ViewError> {
1382        let mut batch = MultiPartitionBatch::new();
1383        batch.add_network_description(information)?;
1384        self.write_batch(batch).await?;
1385        Ok(())
1386    }
1387
1388    fn wasm_runtime(&self) -> Option<WasmRuntime> {
1389        self.wasm_runtime
1390    }
1391
1392    #[instrument(skip_all)]
1393    async fn block_exporter_context(
1394        &self,
1395        block_exporter_id: u32,
1396    ) -> Result<Self::BlockExporterContext, ViewError> {
1397        let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1398        let store = self.database.open_exclusive(&root_key)?;
1399        Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1400    }
1401
1402    async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError> {
1403        let root_keys = self.database.list_root_keys().await?;
1404        let mut blob_ids = Vec::new();
1405        for root_key in root_keys {
1406            if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1407                let root_key_red = &root_key[1..];
1408                let blob_id = bcs::from_bytes(root_key_red)?;
1409                blob_ids.push(blob_id);
1410            }
1411        }
1412        Ok(blob_ids)
1413    }
1414
1415    async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError> {
1416        let root_keys = self.database.list_root_keys().await?;
1417        let mut chain_ids = Vec::new();
1418        for root_key in root_keys {
1419            if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1420                let root_key_red = &root_key[1..];
1421                let chain_id = bcs::from_bytes(root_key_red)?;
1422                chain_ids.push(chain_id);
1423            }
1424        }
1425        Ok(chain_ids)
1426    }
1427
1428    async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError> {
1429        let root_keys = self.database.list_root_keys().await?;
1430        let mut event_ids = Vec::new();
1431        for root_key in root_keys {
1432            if !root_key.is_empty() && root_key[0] == EVENT_ID_TAG {
1433                let root_key_red = &root_key[1..];
1434                let chain_id = bcs::from_bytes(root_key_red)?;
1435                let store = self.database.open_shared(&root_key)?;
1436                let keys = store.find_keys_by_prefix(&[]).await?;
1437                for key in keys {
1438                    let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
1439                    let event_id = EventId {
1440                        chain_id,
1441                        stream_id: restricted_event_id.stream_id,
1442                        index: restricted_event_id.index,
1443                    };
1444                    event_ids.push(event_id);
1445                }
1446            }
1447        }
1448        Ok(event_ids)
1449    }
1450}
1451
1452impl<Database, C> DbStorage<Database, C>
1453where
1454    Database: KeyValueDatabase + Clone,
1455    Database::Store: KeyValueStore + Clone,
1456    C: Clock,
1457    Database::Error: Send + Sync,
1458{
1459    #[instrument(skip_all)]
1460    fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1461        hashes
1462            .iter()
1463            .map(|hash| RootKey::BlockHash(*hash).bytes())
1464            .collect()
1465    }
1466
1467    fn deserialize_and_cache_certificate(
1468        &self,
1469        lite_cert_bytes: &[u8],
1470        confirmed_block_bytes: &[u8],
1471    ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError> {
1472        let lite = bcs::from_bytes::<LiteCertificate>(lite_cert_bytes)?;
1473        let block = bcs::from_bytes::<ConfirmedBlock>(confirmed_block_bytes)?;
1474        let hash = block.hash();
1475        self.caches.confirmed_block.insert(&hash, block.clone());
1476        let certificate = lite
1477            .with_value(block)
1478            .ok_or(ViewError::InconsistentEntries)?;
1479        let arc = self.caches.certificate.insert(&hash, certificate);
1480        Ok(Some(arc))
1481    }
1482
1483    #[instrument(skip_all)]
1484    async fn write_entry(
1485        store: &Database::Store,
1486        key_values: Vec<(Vec<u8>, Vec<u8>)>,
1487    ) -> Result<(), ViewError> {
1488        let mut batch = Batch::new();
1489        for (key, value) in key_values {
1490            batch.put_key_value_bytes(key, value);
1491        }
1492        store.write_batch(batch).await?;
1493        Ok(())
1494    }
1495
1496    #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1497    async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1498        if batch.keys_value_bytes.is_empty() {
1499            return Ok(());
1500        }
1501        let mut futures = Vec::new();
1502        for (root_key, key_values) in batch.keys_value_bytes {
1503            let store = self.database.open_shared(&root_key)?;
1504            futures.push(async move { Self::write_entry(&store, key_values).await });
1505        }
1506        futures::future::try_join_all(futures).await?;
1507        Ok(())
1508    }
1509}
1510
1511impl<Database, C> DbStorage<Database, C> {
1512    fn new(
1513        database: Database,
1514        wasm_runtime: Option<WasmRuntime>,
1515        cache_sizes: StorageCacheConfig,
1516        clock: C,
1517    ) -> Self {
1518        Self {
1519            database: Arc::new(database),
1520            clock,
1521            // The `Arc` here is required on native but useless on the Web.
1522            #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1523            thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1524            wasm_runtime,
1525            user_contracts: Arc::new(papaya::HashMap::new()),
1526            user_services: Arc::new(papaya::HashMap::new()),
1527            shared_committees: SharedCommittees::new(),
1528            caches: StorageCaches::new(cache_sizes),
1529            execution_runtime_config: ExecutionRuntimeConfig::default(),
1530        }
1531    }
1532
1533    /// Sets whether contract log messages should be output.
1534    pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1535        self.execution_runtime_config.allow_application_logs = allow;
1536        self
1537    }
1538}
1539
1540impl<Database> DbStorage<Database, WallClock>
1541where
1542    Database: KeyValueDatabase + Clone + 'static,
1543    Database::Error: Send + Sync,
1544    Database::Store: KeyValueStore + Clone + 'static,
1545{
1546    pub async fn maybe_create_and_connect(
1547        config: &Database::Config,
1548        namespace: &str,
1549        wasm_runtime: Option<WasmRuntime>,
1550        cache_sizes: StorageCacheConfig,
1551    ) -> Result<Self, Database::Error> {
1552        let database = Database::maybe_create_and_connect(config, namespace).await?;
1553        Ok(Self::new(database, wasm_runtime, cache_sizes, WallClock))
1554    }
1555
1556    pub async fn connect(
1557        config: &Database::Config,
1558        namespace: &str,
1559        wasm_runtime: Option<WasmRuntime>,
1560        cache_sizes: StorageCacheConfig,
1561    ) -> Result<Self, Database::Error> {
1562        let database = Database::connect(config, namespace).await?;
1563        Ok(Self::new(database, wasm_runtime, cache_sizes, WallClock))
1564    }
1565}
1566
1567#[cfg(with_testing)]
1568impl<Database> DbStorage<Database, TestClock>
1569where
1570    Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1571    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1572    Database::Error: Send + Sync,
1573{
1574    pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1575        let config = Database::new_test_config().await.unwrap();
1576        let namespace = generate_test_namespace();
1577        DbStorage::<Database, TestClock>::new_for_testing(
1578            config,
1579            &namespace,
1580            wasm_runtime,
1581            TestClock::new(),
1582        )
1583        .await
1584        .unwrap()
1585    }
1586
1587    pub async fn new_for_testing(
1588        config: Database::Config,
1589        namespace: &str,
1590        wasm_runtime: Option<WasmRuntime>,
1591        clock: TestClock,
1592    ) -> Result<Self, Database::Error> {
1593        let database = Database::recreate_and_connect(&config, namespace).await?;
1594        Ok(Self::new(
1595            database,
1596            wasm_runtime,
1597            DEFAULT_STORAGE_CACHE_CONFIG,
1598            clock,
1599        ))
1600    }
1601}
1602
1603#[cfg(test)]
1604mod tests {
1605    use linera_base::{
1606        crypto::{CryptoHash, TestString},
1607        data_types::{BlockHeight, Epoch, Round, Timestamp},
1608        identifiers::{
1609            ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
1610            StreamName,
1611        },
1612    };
1613    use linera_chain::{
1614        block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
1615        types::ConfirmedBlockCertificate,
1616    };
1617    use linera_views::{
1618        memory::MemoryDatabase,
1619        store::{KeyValueDatabase, ReadableKeyValueStore as _},
1620    };
1621
1622    use crate::{
1623        db_storage::{
1624            to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
1625            EVENT_ID_TAG,
1626        },
1627        DbStorage, Storage, TestClock,
1628    };
1629
1630    // Several functionalities of the storage rely on the way that the serialization
1631    // is done. Thus we need to check that the serialization works in the way that
1632    // we expect.
1633
1634    // The listing of the blobs in `list_blob_ids` depends on the serialization
1635    // of `RootKey::Blob`.
1636    #[test]
1637    fn test_root_key_blob_serialization() {
1638        let hash = CryptoHash::default();
1639        let blob_type = BlobType::default();
1640        let blob_id = BlobId::new(hash, blob_type);
1641        let root_key = RootKey::BlobId(blob_id).bytes();
1642        assert_eq!(root_key[0], BLOB_ID_TAG);
1643        assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
1644    }
1645
1646    // The listing of the chains in `list_chain_ids` depends on the serialization
1647    // of `RootKey::ChainState`.
1648    #[test]
1649    fn test_root_key_chainstate_serialization() {
1650        let hash = CryptoHash::default();
1651        let chain_id = ChainId(hash);
1652        let root_key = RootKey::ChainState(chain_id).bytes();
1653        assert_eq!(root_key[0], CHAIN_ID_TAG);
1654        assert_eq!(
1655            bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
1656            chain_id
1657        );
1658    }
1659
1660    // The listing of the events in `read_events_from_index` depends on the
1661    // serialization of `RootKey::Event`.
1662    #[test]
1663    fn test_root_key_event_serialization() {
1664        let hash = CryptoHash::test_hash("49");
1665        let chain_id = ChainId(hash);
1666        let application_description_hash = CryptoHash::test_hash("42");
1667        let application_id = ApplicationId::new(application_description_hash);
1668        let application_id = GenericApplicationId::User(application_id);
1669        let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
1670        let stream_id = StreamId {
1671            application_id,
1672            stream_name,
1673        };
1674        let prefix = bcs::to_bytes(&stream_id).unwrap();
1675
1676        let index = 1567;
1677        let event_id = EventId {
1678            chain_id,
1679            stream_id,
1680            index,
1681        };
1682        let root_key = RootKey::Event(chain_id).bytes();
1683        assert_eq!(root_key[0], EVENT_ID_TAG);
1684        let key = to_event_key(&event_id);
1685        assert!(key.starts_with(&prefix));
1686    }
1687
1688    // The height index lookup depends on the serialization of RootKey::BlockByHeight
1689    // and to_height_key, following the same pattern as Event.
1690    #[test]
1691    fn test_root_key_block_by_height_serialization() {
1692        use linera_base::data_types::BlockHeight;
1693
1694        let hash = CryptoHash::default();
1695        let chain_id = ChainId(hash);
1696        let height = BlockHeight(42);
1697
1698        // RootKey::BlockByHeight uses only ChainId for partitioning (like Event)
1699        let root_key = RootKey::BlockByHeight(chain_id).bytes();
1700        let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
1701        assert_eq!(deserialized_chain_id, chain_id);
1702
1703        // Height is encoded as a key (like index in Event)
1704        let height_key = to_height_key(height);
1705        let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
1706        assert_eq!(deserialized_height, height);
1707    }
1708
1709    #[cfg(with_testing)]
1710    #[tokio::test]
1711    async fn test_add_certificate_creates_height_index() {
1712        // Create test storage
1713        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1714
1715        // Create a test certificate at a specific height
1716        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1717        let height = BlockHeight(5);
1718        let block = Block {
1719            header: BlockHeader {
1720                chain_id,
1721                epoch: Epoch::ZERO,
1722                height,
1723                timestamp: Timestamp::from(0),
1724                state_hash: CryptoHash::new(&TestString::new("state_hash")),
1725                previous_block_hash: None,
1726                authenticated_owner: None,
1727                transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
1728                messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
1729                previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1730                    "prev_msg_blocks_hash",
1731                )),
1732                previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1733                    "prev_event_blocks_hash",
1734                )),
1735                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
1736                events_hash: CryptoHash::new(&TestString::new("events_hash")),
1737                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1738                operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
1739            },
1740            body: BlockBody {
1741                transactions: vec![],
1742                messages: vec![],
1743                previous_message_blocks: Default::default(),
1744                previous_event_blocks: Default::default(),
1745                oracle_responses: vec![],
1746                events: vec![],
1747                blobs: vec![],
1748                operation_results: vec![],
1749            },
1750        };
1751        let confirmed_block = ConfirmedBlock::new(block);
1752        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1753
1754        // Write certificate
1755        let mut batch = MultiPartitionBatch::new();
1756        batch.add_certificate(&certificate).unwrap();
1757        storage.write_batch(batch).await.unwrap();
1758
1759        // Verify height index was created (following Event pattern)
1760        let hash = certificate.hash();
1761        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1762        let store = storage.database.open_shared(&index_root_key).unwrap();
1763        let height_key = to_height_key(height);
1764        let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
1765
1766        assert!(value_bytes.is_some(), "Height index was not created");
1767        let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
1768        assert_eq!(stored_hash, hash, "Height index contains wrong hash");
1769    }
1770
1771    #[cfg(with_testing)]
1772    #[tokio::test]
1773    async fn test_read_certificates_by_heights() {
1774        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1775        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1776
1777        // Write certificates at heights 1, 3, 5
1778        let mut batch = MultiPartitionBatch::new();
1779        let mut expected_certs = vec![];
1780
1781        for height in [1, 3, 5] {
1782            let block = Block {
1783                header: BlockHeader {
1784                    chain_id,
1785                    epoch: Epoch::ZERO,
1786                    height: BlockHeight(height),
1787                    timestamp: Timestamp::from(0),
1788                    state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
1789                    previous_block_hash: None,
1790                    authenticated_owner: None,
1791                    transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
1792                    messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
1793                    previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1794                        "pmb_hash_{height}",
1795                    )),
1796                    previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1797                        "peb_hash_{height}",
1798                    )),
1799                    oracle_responses_hash: CryptoHash::new(&TestString::new(
1800                        "oracle_hash_{height}",
1801                    )),
1802                    events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
1803                    blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
1804                    operation_results_hash: CryptoHash::new(&TestString::new(
1805                        "op_results_hash_{height}",
1806                    )),
1807                },
1808                body: BlockBody {
1809                    transactions: vec![],
1810                    messages: vec![],
1811                    previous_message_blocks: Default::default(),
1812                    previous_event_blocks: Default::default(),
1813                    oracle_responses: vec![],
1814                    events: vec![],
1815                    blobs: vec![],
1816                    operation_results: vec![],
1817                },
1818            };
1819            let confirmed_block = ConfirmedBlock::new(block);
1820            let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1821            expected_certs.push((height, cert.clone()));
1822            batch.add_certificate(&cert).unwrap();
1823        }
1824        storage.write_batch(batch).await.unwrap();
1825
1826        // Test: Read in order [1, 3, 5]
1827        let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
1828        let result = storage
1829            .read_certificates_by_heights(chain_id, &heights)
1830            .await
1831            .unwrap();
1832        assert_eq!(result.len(), 3);
1833        assert_eq!(
1834            result[0].as_ref().unwrap().hash(),
1835            expected_certs[0].1.hash()
1836        );
1837        assert_eq!(
1838            result[1].as_ref().unwrap().hash(),
1839            expected_certs[1].1.hash()
1840        );
1841        assert_eq!(
1842            result[2].as_ref().unwrap().hash(),
1843            expected_certs[2].1.hash()
1844        );
1845
1846        // Test: Read out of order [5, 1, 3]
1847        let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
1848        let result = storage
1849            .read_certificates_by_heights(chain_id, &heights)
1850            .await
1851            .unwrap();
1852        assert_eq!(result.len(), 3);
1853        assert_eq!(
1854            result[0].as_ref().unwrap().hash(),
1855            expected_certs[2].1.hash()
1856        );
1857        assert_eq!(
1858            result[1].as_ref().unwrap().hash(),
1859            expected_certs[0].1.hash()
1860        );
1861        assert_eq!(
1862            result[2].as_ref().unwrap().hash(),
1863            expected_certs[1].1.hash()
1864        );
1865
1866        // Test: Read with missing heights [1, 2, 3]
1867        let heights = vec![
1868            BlockHeight(1),
1869            BlockHeight(2),
1870            BlockHeight(3),
1871            BlockHeight(3),
1872        ];
1873        let result = storage
1874            .read_certificates_by_heights(chain_id, &heights)
1875            .await
1876            .unwrap();
1877        assert_eq!(result.len(), 4); // BlockHeight(3) was duplicated.
1878        assert!(result[0].is_some());
1879        assert!(result[1].is_none()); // Height 2 doesn't exist
1880        assert!(result[2].is_some());
1881        assert!(result[3].is_some());
1882        assert_eq!(
1883            result[2].as_ref().unwrap().hash(),
1884            result[3].as_ref().unwrap().hash()
1885        ); // Both correspond to height 3
1886
1887        // Test: Empty heights
1888        let heights = vec![];
1889        let result = storage
1890            .read_certificates_by_heights(chain_id, &heights)
1891            .await
1892            .unwrap();
1893        assert_eq!(result.len(), 0);
1894    }
1895
1896    #[cfg(with_testing)]
1897    #[tokio::test]
1898    async fn test_read_certificates_by_heights_multiple_chains() {
1899        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1900
1901        // Create certificates for two different chains at same heights
1902        let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
1903        let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
1904
1905        let mut batch = MultiPartitionBatch::new();
1906
1907        let block_a = Block {
1908            header: BlockHeader {
1909                chain_id: chain_a,
1910                epoch: Epoch::ZERO,
1911                height: BlockHeight(10),
1912                timestamp: Timestamp::from(0),
1913                state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
1914                previous_block_hash: None,
1915                authenticated_owner: None,
1916                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
1917                messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
1918                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
1919                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
1920                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
1921                events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
1922                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
1923                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
1924            },
1925            body: BlockBody {
1926                transactions: vec![],
1927                messages: vec![],
1928                previous_message_blocks: Default::default(),
1929                previous_event_blocks: Default::default(),
1930                oracle_responses: vec![],
1931                events: vec![],
1932                blobs: vec![],
1933                operation_results: vec![],
1934            },
1935        };
1936        let confirmed_block_a = ConfirmedBlock::new(block_a);
1937        let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
1938        batch.add_certificate(&cert_a).unwrap();
1939
1940        let block_b = Block {
1941            header: BlockHeader {
1942                chain_id: chain_b,
1943                epoch: Epoch::ZERO,
1944                height: BlockHeight(10),
1945                timestamp: Timestamp::from(0),
1946                state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
1947                previous_block_hash: None,
1948                authenticated_owner: None,
1949                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
1950                messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
1951                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
1952                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
1953                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
1954                events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
1955                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
1956                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
1957            },
1958            body: BlockBody {
1959                transactions: vec![],
1960                messages: vec![],
1961                previous_message_blocks: Default::default(),
1962                previous_event_blocks: Default::default(),
1963                oracle_responses: vec![],
1964                events: vec![],
1965                blobs: vec![],
1966                operation_results: vec![],
1967            },
1968        };
1969        let confirmed_block_b = ConfirmedBlock::new(block_b);
1970        let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
1971        batch.add_certificate(&cert_b).unwrap();
1972
1973        storage.write_batch(batch).await.unwrap();
1974
1975        // Read from chain A - should get cert A
1976        let result = storage
1977            .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
1978            .await
1979            .unwrap();
1980        assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
1981
1982        // Read from chain B - should get cert B
1983        let result = storage
1984            .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
1985            .await
1986            .unwrap();
1987        assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
1988
1989        // Read from chain A for height that only chain B has - should get None
1990        let result = storage
1991            .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
1992            .await
1993            .unwrap();
1994        assert!(result[0].is_none());
1995    }
1996
1997    #[cfg(with_testing)]
1998    #[tokio::test]
1999    async fn test_read_certificates_by_heights_consistency() {
2000        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
2001        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
2002
2003        // Write certificate
2004        let mut batch = MultiPartitionBatch::new();
2005        let block = Block {
2006            header: BlockHeader {
2007                chain_id,
2008                epoch: Epoch::ZERO,
2009                height: BlockHeight(7),
2010                timestamp: Timestamp::from(0),
2011                state_hash: CryptoHash::new(&TestString::new("state_hash")),
2012                previous_block_hash: None,
2013                authenticated_owner: None,
2014                transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
2015                messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
2016                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
2017                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
2018                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
2019                events_hash: CryptoHash::new(&TestString::new("events_hash")),
2020                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
2021                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
2022            },
2023            body: BlockBody {
2024                transactions: vec![],
2025                messages: vec![],
2026                previous_message_blocks: Default::default(),
2027                previous_event_blocks: Default::default(),
2028                oracle_responses: vec![],
2029                events: vec![],
2030                blobs: vec![],
2031                operation_results: vec![],
2032            },
2033        };
2034        let confirmed_block = ConfirmedBlock::new(block);
2035        let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
2036        let hash = cert.hash();
2037        batch.add_certificate(&cert).unwrap();
2038        storage.write_batch(batch).await.unwrap();
2039
2040        // Read by hash
2041        let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
2042
2043        // Read by height
2044        let certs_by_height = storage
2045            .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
2046            .await
2047            .unwrap();
2048        let cert_by_height = certs_by_height[0].as_ref().unwrap();
2049
2050        // Should be identical
2051        assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
2052        assert_eq!(
2053            cert_by_hash.value().block().header,
2054            cert_by_height.value().block().header
2055        );
2056    }
2057}