Skip to main content

linera_storage/
db_storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap},
6    fmt::Debug,
7    sync::{Arc, OnceLock},
8};
9
10use async_trait::async_trait;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
16    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
17};
18use linera_cache::{Arc as CacheArc, ValueCache};
19use linera_chain::{
20    types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
21    ChainStateView,
22};
23use linera_execution::{
24    BlobState, ExecutionRuntimeConfig, SharedCommittees, UserContractCode, UserServiceCode,
25    WasmRuntime,
26};
27use linera_views::{
28    backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
29    batch::Batch,
30    context::ViewContext,
31    store::{
32        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
33    },
34    views::View,
35    ViewError,
36};
37use serde::{Deserialize, Serialize};
38use tracing::{debug, instrument};
39#[cfg(with_testing)]
40use {
41    futures::channel::oneshot::{self, Receiver},
42    linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
43    std::cmp::Reverse,
44};
45
46use crate::{ChainRuntimeContext, Clock, Storage};
47
48/// Prometheus metrics for storage operations.
49#[cfg(with_metrics)]
50pub mod metrics {
51    use std::sync::LazyLock;
52
53    use linera_base::prometheus_util::{
54        exponential_bucket_interval, exponential_bucket_latencies, linear_bucket_interval,
55        register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
56    };
57    use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
58
59    /// Label name for distinguishing cache hits vs DB reads.
60    pub(super) const SOURCE_LABEL: &str = "source";
61    /// Label value for items served from the in-memory cache.
62    pub(super) const CACHE: &str = "cache";
63    /// Label value for items served from the database.
64    pub(super) const DB: &str = "db";
65
66    /// The metric counting how often a blob is tested for existence from storage
67    pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
68        register_int_counter_vec(
69            "contains_blob",
70            "The metric counting how often a blob is tested for existence from storage",
71            &[SOURCE_LABEL],
72        )
73    });
74
75    /// The metric counting how often multiple blobs are tested for existence from storage
76    pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
77        register_int_counter_vec(
78            "contains_blobs",
79            "The metric counting how often multiple blobs are tested for existence from storage",
80            &[SOURCE_LABEL],
81        )
82    });
83
84    /// The metric counting how often a blob state is tested for existence from storage
85    pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
86        register_int_counter_vec(
87            "contains_blob_state",
88            "The metric counting how often a blob state is tested for existence from storage",
89            &[SOURCE_LABEL],
90        )
91    });
92
93    /// The metric counting how often a certificate is tested for existence from storage.
94    pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
95        register_int_counter_vec(
96            "contains_certificate",
97            "The metric counting how often a certificate is tested for existence from storage",
98            &[SOURCE_LABEL],
99        )
100    });
101
102    /// The metric counting how often a hashed certificate value is read from storage.
103    #[doc(hidden)]
104    pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
105        register_int_counter_vec(
106            "read_confirmed_block",
107            "The metric counting how often a hashed confirmed block is read from storage",
108            &[SOURCE_LABEL],
109        )
110    });
111
112    /// The metric counting how often confirmed blocks are read from storage.
113    #[doc(hidden)]
114    pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
115        LazyLock::new(|| {
116            register_int_counter_vec(
117                "read_confirmed_blocks",
118                "The metric counting how often confirmed blocks are read from storage",
119                &[SOURCE_LABEL],
120            )
121        });
122
123    /// The metric counting how often a blob is read from storage.
124    #[doc(hidden)]
125    pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
126        register_int_counter_vec(
127            "read_blob",
128            "The metric counting how often a blob is read from storage",
129            &[SOURCE_LABEL],
130        )
131    });
132
133    /// The metric counting how often a blob state is read from storage.
134    #[doc(hidden)]
135    pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
136        register_int_counter_vec(
137            "read_blob_state",
138            "The metric counting how often a blob state is read from storage",
139            &[SOURCE_LABEL],
140        )
141    });
142
143    /// The metric counting how often a blob is written to storage.
144    #[doc(hidden)]
145    pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
146        register_int_counter(
147            "write_blob",
148            "The metric counting how often a blob is written to storage",
149        )
150    });
151
152    /// The metric counting how often a certificate is read from storage.
153    #[doc(hidden)]
154    pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
155        register_int_counter_vec(
156            "read_certificate",
157            "The metric counting how often a certificate is read from storage",
158            &[SOURCE_LABEL],
159        )
160    });
161
162    /// The metric counting how often certificates are read from storage.
163    #[doc(hidden)]
164    pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
165        register_int_counter_vec(
166            "read_certificates",
167            "The metric counting how often certificate are read from storage",
168            &[SOURCE_LABEL],
169        )
170    });
171
172    /// The metric counting how often a certificate is written to storage.
173    #[doc(hidden)]
174    pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
175        register_int_counter(
176            "write_certificate",
177            "The metric counting how often a certificate is written to storage",
178        )
179    });
180
181    /// Serialized size of the lite-certificate component (round + value hash + validator
182    /// signatures), observed when a confirmed certificate is written to storage. Bytes are
183    /// taken from the already-produced BCS output, so this adds no extra serialization work.
184    /// Sized to track the signature component, which is what grows under post-quantum
185    /// signature migration (10x for Falcon-512, 38x for ML-DSA-44).
186    pub(super) static CERTIFICATE_LITE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
187        register_histogram(
188            "certificate_lite_bytes",
189            "Serialized size of the lite-certificate (signatures + metadata) in bytes",
190            exponential_bucket_interval(128.0, 2_097_152.0),
191        )
192    });
193
194    /// Serialized size of the certificate value (block payload), observed when a confirmed
195    /// certificate is written to storage. Bytes are taken from the already-produced BCS
196    /// output. Range matches the gRPC max message size cap.
197    pub(super) static CERTIFICATE_VALUE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
198        register_histogram(
199            "certificate_value_bytes",
200            "Serialized size of the certificate value (block payload) in bytes",
201            exponential_bucket_interval(256.0, 16_777_216.0),
202        )
203    });
204
205    /// Number of validator signatures attached to each confirmed certificate. Linear buckets
206    /// because committee size is small (typically under 20) and resolution at single-signer
207    /// granularity matters more than range.
208    pub(super) static CERTIFICATE_SIGNER_COUNT: LazyLock<Histogram> = LazyLock::new(|| {
209        register_histogram(
210            "certificate_signer_count",
211            "Number of validator signatures attached to each confirmed certificate",
212            linear_bucket_interval(1.0, 1.0, 20.0),
213        )
214    });
215
216    /// The latency to load a chain state.
217    #[doc(hidden)]
218    pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
219        register_histogram_vec(
220            "load_chain_latency",
221            "The latency to load a chain state",
222            &[],
223            exponential_bucket_latencies(1000.0),
224        )
225    });
226
227    /// The metric counting how often an event is read from storage.
228    #[doc(hidden)]
229    pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
230        register_int_counter_vec(
231            "read_event",
232            "The metric counting how often an event is read from storage",
233            &[SOURCE_LABEL],
234        )
235    });
236
237    /// The metric counting how often an event is tested for existence from storage
238    pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
239        register_int_counter_vec(
240            "contains_event",
241            "The metric counting how often an event is tested for existence from storage",
242            &[SOURCE_LABEL],
243        )
244    });
245
246    /// The metric counting how often an event is written to storage.
247    #[doc(hidden)]
248    pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
249        register_int_counter(
250            "write_event",
251            "The metric counting how often an event is written to storage",
252        )
253    });
254
255    /// The metric counting how often a block hash is read by height from storage.
256    #[doc(hidden)]
257    pub(super) static READ_BLOCK_HASH_BY_HEIGHT_COUNTER: LazyLock<IntCounterVec> =
258        LazyLock::new(|| {
259            register_int_counter_vec(
260                "read_block_hash_by_height",
261                "The metric counting how often a block hash is read by height from storage",
262                &[SOURCE_LABEL],
263            )
264        });
265
266    /// The metric counting how often an event block height is read from storage.
267    #[doc(hidden)]
268    pub(super) static READ_EVENT_BLOCK_HEIGHT_COUNTER: LazyLock<IntCounterVec> =
269        LazyLock::new(|| {
270            register_int_counter_vec(
271                "read_event_block_height",
272                "The metric counting how often an event block height is read from storage",
273                &[SOURCE_LABEL],
274            )
275        });
276
277    /// The metric counting how often the network description is read from storage.
278    #[doc(hidden)]
279    pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
280        register_int_counter_vec(
281            "network_description",
282            "The metric counting how often the network description is read from storage",
283            &[SOURCE_LABEL],
284        )
285    });
286
287    /// The metric counting how often the network description is written to storage.
288    #[doc(hidden)]
289    pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounter> = LazyLock::new(|| {
290        register_int_counter(
291            "write_network_description",
292            "The metric counting how often the network description is written to storage",
293        )
294    });
295}
296
297/// The key used for blobs. The Blob ID itself is contained in the root key.
298const BLOB_KEY: &[u8] = &[0];
299
300/// The key used for blob states. The Blob ID itself is contained in the root key.
301const BLOB_STATE_KEY: &[u8] = &[1];
302
303/// The key used for lite certificates. The cryptohash itself is contained in the root key.
304const LITE_CERTIFICATE_KEY: &[u8] = &[2];
305
306/// The key used for confirmed blocks. The cryptohash itself is contained in the root key.
307const BLOCK_KEY: &[u8] = &[3];
308
309/// The key used for the network description.
310const NETWORK_DESCRIPTION_KEY: &[u8] = &[4];
311
312fn get_block_keys() -> Vec<Vec<u8>> {
313    vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
314}
315
316#[derive(Default)]
317#[expect(clippy::type_complexity)]
318struct MultiPartitionBatch {
319    keys_value_bytes: BTreeMap<Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>>,
320}
321
322impl MultiPartitionBatch {
323    fn new() -> Self {
324        Self::default()
325    }
326
327    fn put_key_values(&mut self, root_key: Vec<u8>, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
328        let entry = self.keys_value_bytes.entry(root_key).or_default();
329        entry.extend(key_values);
330    }
331
332    fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
333        self.put_key_values(root_key, vec![(key, value)]);
334    }
335
336    fn add_blob(&mut self, blob: &Blob) {
337        #[cfg(with_metrics)]
338        metrics::WRITE_BLOB_COUNTER.inc();
339        let root_key = RootKey::BlobId(blob.id()).bytes();
340        let key = BLOB_KEY.to_vec();
341        self.put_key_value(root_key, key, blob.bytes().to_vec());
342    }
343
344    fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
345        let root_key = RootKey::BlobId(blob_id).bytes();
346        let key = BLOB_STATE_KEY.to_vec();
347        let value = bcs::to_bytes(blob_state)?;
348        self.put_key_value(root_key, key, value);
349        Ok(())
350    }
351
352    /// Adds a certificate to the batch.
353    ///
354    /// Writes both the certificate data (indexed by hash) and a height index
355    /// (mapping chain_id + height to hash).
356    ///
357    /// Note: If called multiple times with the same `(chain_id, height)`, the height
358    /// index will be overwritten. The caller is responsible for ensuring that
359    /// certificates at the same height have the same hash.
360    fn add_certificate(
361        &mut self,
362        certificate: &ConfirmedBlockCertificate,
363    ) -> Result<(), ViewError> {
364        #[cfg(with_metrics)]
365        {
366            metrics::WRITE_CERTIFICATE_COUNTER.inc();
367            metrics::CERTIFICATE_SIGNER_COUNT.observe(certificate.signatures().len() as f64);
368        }
369        let hash = certificate.hash();
370
371        // Write certificate data by hash
372        let root_key = RootKey::BlockHash(hash).bytes();
373        let mut key_values = Vec::new();
374        let key = LITE_CERTIFICATE_KEY.to_vec();
375        let value = bcs::to_bytes(&certificate.lite_certificate())?;
376        #[cfg(with_metrics)]
377        metrics::CERTIFICATE_LITE_BYTES.observe(value.len() as f64);
378        key_values.push((key, value));
379        let key = BLOCK_KEY.to_vec();
380        let value = bcs::to_bytes(&certificate.value())?;
381        #[cfg(with_metrics)]
382        metrics::CERTIFICATE_VALUE_BYTES.observe(value.len() as f64);
383        key_values.push((key, value));
384        self.put_key_values(root_key, key_values);
385
386        // Write height index: chain_id -> height -> hash
387        let chain_id = certificate.value().block().header.chain_id;
388        let height = certificate.value().block().header.height;
389        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
390        let height_key = to_height_key(height);
391        let index_value = bcs::to_bytes(&hash)?;
392        self.put_key_value(index_root_key, height_key, index_value);
393
394        // Write event block height index: chain_id -> (stream_id, index) -> height
395        let event_index_root_key = RootKey::EventBlockHeight(chain_id).bytes();
396        let height_value = bcs::to_bytes(&height)?;
397        for event in certificate.value().block().body.events.iter().flatten() {
398            let event_key = to_event_key(&EventId {
399                chain_id,
400                stream_id: event.stream_id.clone(),
401                index: event.index,
402            });
403            self.put_key_value(
404                event_index_root_key.clone(),
405                event_key,
406                height_value.clone(),
407            );
408        }
409
410        Ok(())
411    }
412
413    fn add_event(&mut self, event_id: &EventId, value: Vec<u8>) {
414        #[cfg(with_metrics)]
415        metrics::WRITE_EVENT_COUNTER.inc();
416        let key = to_event_key(event_id);
417        let root_key = RootKey::Event(event_id.chain_id).bytes();
418        self.put_key_value(root_key, key, value);
419    }
420
421    fn add_network_description(
422        &mut self,
423        information: &NetworkDescription,
424    ) -> Result<(), ViewError> {
425        #[cfg(with_metrics)]
426        metrics::WRITE_NETWORK_DESCRIPTION.inc();
427        let root_key = RootKey::NetworkDescription.bytes();
428        let key = NETWORK_DESCRIPTION_KEY.to_vec();
429        let value = bcs::to_bytes(information)?;
430        self.put_key_value(root_key, key, value);
431        Ok(())
432    }
433}
434
435/// Individual cache sizes for each `ValueCache` in `DbStorage`.
436#[derive(Clone, Copy, Debug)]
437pub struct StorageCacheConfig {
438    /// The maximum number of blobs to cache.
439    pub blob_cache_size: usize,
440    /// The maximum number of confirmed blocks to cache.
441    pub confirmed_block_cache_size: usize,
442    /// The maximum number of assembled certificates to cache.
443    pub certificate_cache_size: usize,
444    /// The maximum number of raw (serialized) certificates to cache.
445    pub certificate_raw_cache_size: usize,
446    /// The maximum number of events to cache.
447    pub event_cache_size: usize,
448    /// The maximum number of block hashes to cache, keyed by `(chain, height)`.
449    pub block_hash_by_height_cache_size: usize,
450    /// The maximum number of event-to-block-height index entries to cache.
451    pub event_block_height_cache_size: usize,
452    /// The interval, in seconds, between cache cleanup passes.
453    pub cache_cleanup_interval_secs: u64,
454}
455
456/// Default cache configuration for testing.
457#[cfg(with_testing)]
458pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
459    blob_cache_size: 1000,
460    confirmed_block_cache_size: 1000,
461    certificate_cache_size: 1000,
462    certificate_raw_cache_size: 1000,
463    event_cache_size: 1000,
464    block_hash_by_height_cache_size: 1000,
465    event_block_height_cache_size: 1000,
466    cache_cleanup_interval_secs: linera_cache::DEFAULT_CLEANUP_INTERVAL_SECS,
467};
468
469/// Raw certificate bytes: (lite_certificate_bytes, confirmed_block_bytes).
470type RawCertificate = (Vec<u8>, Vec<u8>);
471
472/// Groups all `ValueCache` instances used by `DbStorage`.
473///
474/// All caches use `ValueCache` which stores values as `Arc<V>` internally,
475/// ensuring memory-efficient sharing across consumers. Adding a new cache
476/// here automatically inherits Arc-based sharing.
477#[derive(Clone)]
478pub struct StorageCaches {
479    pub(crate) blob: Arc<ValueCache<BlobId, Blob>>,
480    pub(crate) confirmed_block: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
481    pub(crate) certificate: Arc<ValueCache<CryptoHash, ConfirmedBlockCertificate>>,
482    pub(crate) certificate_raw: Arc<ValueCache<CryptoHash, RawCertificate>>,
483    pub(crate) event: Arc<ValueCache<EventId, Vec<u8>>>,
484    pub(crate) block_hash_by_height: Arc<ValueCache<(ChainId, BlockHeight), CryptoHash>>,
485    pub(crate) event_block_height: Arc<ValueCache<EventId, BlockHeight>>,
486    pub(crate) network_description: Arc<OnceLock<NetworkDescription>>,
487}
488
489impl StorageCaches {
490    /// Creates all caches with the given sizes.
491    pub fn new(sizes: StorageCacheConfig) -> Self {
492        let interval = sizes.cache_cleanup_interval_secs;
493        Self {
494            blob: Arc::new(ValueCache::new(
495                "storage_blob",
496                sizes.blob_cache_size,
497                interval,
498            )),
499            confirmed_block: Arc::new(ValueCache::new(
500                "storage_confirmed_block",
501                sizes.confirmed_block_cache_size,
502                interval,
503            )),
504            certificate: Arc::new(ValueCache::new(
505                "storage_certificate",
506                sizes.certificate_cache_size,
507                interval,
508            )),
509            certificate_raw: Arc::new(ValueCache::new(
510                "storage_certificate_raw",
511                sizes.certificate_raw_cache_size,
512                interval,
513            )),
514            event: Arc::new(ValueCache::new(
515                "storage_event",
516                sizes.event_cache_size,
517                interval,
518            )),
519            block_hash_by_height: Arc::new(ValueCache::new(
520                "storage_block_hash_by_height",
521                sizes.block_hash_by_height_cache_size,
522                interval,
523            )),
524            event_block_height: Arc::new(ValueCache::new(
525                "storage_event_block_height",
526                sizes.event_block_height_cache_size,
527                interval,
528            )),
529            network_description: Arc::new(OnceLock::new()),
530        }
531    }
532}
533
534/// Main implementation of the [`Storage`] trait.
535#[derive(Clone)]
536pub struct DbStorage<Database, Clock = WallClock> {
537    database: Arc<Database>,
538    clock: Clock,
539    thread_pool: Arc<linera_execution::ThreadPool>,
540    wasm_runtime: Option<WasmRuntime>,
541    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
542    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
543    shared_committees: SharedCommittees,
544    caches: StorageCaches,
545    execution_runtime_config: ExecutionRuntimeConfig,
546}
547
548/// The partition key under which a group of related entries is stored.
549#[derive(Debug, Serialize, Deserialize)]
550pub enum RootKey {
551    /// The network description.
552    NetworkDescription,
553    /// The state of a block exporter, keyed by its ID.
554    BlockExporterState(u32),
555    /// The state of a chain.
556    ChainState(ChainId),
557    /// A certificate and confirmed block, keyed by block hash.
558    BlockHash(CryptoHash),
559    /// A blob and its state, keyed by blob ID.
560    BlobId(BlobId),
561    /// The events of a chain.
562    Event(ChainId),
563    /// The block-height-to-hash index of a chain.
564    BlockByHeight(ChainId),
565    /// The event-to-block-height index of a chain.
566    EventBlockHeight(ChainId),
567}
568
569const CHAIN_ID_TAG: u8 = 2;
570const BLOB_ID_TAG: u8 = 4;
571const EVENT_ID_TAG: u8 = 5;
572
573impl RootKey {
574    /// Returns the serialized bytes of this root key.
575    pub fn bytes(&self) -> Vec<u8> {
576        bcs::to_bytes(self).unwrap()
577    }
578}
579
580#[derive(Debug, Serialize, Deserialize)]
581struct RestrictedEventId {
582    pub stream_id: StreamId,
583    pub index: u32,
584}
585
586fn to_event_key(event_id: &EventId) -> Vec<u8> {
587    let restricted_event_id = RestrictedEventId {
588        stream_id: event_id.stream_id.clone(),
589        index: event_id.index,
590    };
591    bcs::to_bytes(&restricted_event_id).unwrap()
592}
593
594pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
595    bcs::to_bytes(&height).unwrap()
596}
597
598fn is_chain_state(root_key: &[u8]) -> bool {
599    if root_key.is_empty() {
600        return false;
601    }
602    root_key[0] == CHAIN_ID_TAG
603}
604
605/// An implementation of [`DualStoreRootKeyAssignment`] that stores the
606/// chain states into the first store.
607#[derive(Clone, Copy)]
608pub struct ChainStatesFirstAssignment;
609
610impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
611    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
612        if root_key.is_empty() {
613            return Ok(StoreInUse::Second);
614        }
615        let store = match is_chain_state(root_key) {
616            true => StoreInUse::First,
617            false => StoreInUse::Second,
618        };
619        Ok(store)
620    }
621}
622
623/// A `Clock` implementation using the system clock.
624#[derive(Clone)]
625pub struct WallClock;
626
627#[cfg_attr(not(web), async_trait)]
628#[cfg_attr(web, async_trait(?Send))]
629impl Clock for WallClock {
630    fn current_time(&self) -> Timestamp {
631        Timestamp::now()
632    }
633
634    async fn sleep_until(&self, timestamp: Timestamp) {
635        let delta = timestamp.delta_since(Timestamp::now());
636        if delta > TimeDelta::ZERO {
637            linera_base::time::timer::sleep(delta.as_duration()).await
638        }
639    }
640}
641
642#[cfg(with_testing)]
643#[derive(Default)]
644struct TestClockInner {
645    time: Timestamp,
646    sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
647    /// Optional callback that decides whether to auto-advance for a given target timestamp.
648    /// Returns `true` if the clock should auto-advance to that time.
649    sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
650}
651
652#[cfg(with_testing)]
653impl TestClockInner {
654    fn set(&mut self, time: Timestamp) {
655        self.time = time;
656        let senders = self.sleeps.split_off(&Reverse(time));
657        for sender in senders.into_values().flatten() {
658            // Receiver may have been dropped if the sleep was cancelled.
659            sender.send(()).ok();
660        }
661    }
662
663    fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
664        let (sender, receiver) = oneshot::channel();
665        let should_auto_advance = self
666            .sleep_callback
667            .as_ref()
668            .is_some_and(|callback| callback(time));
669        if should_auto_advance && time > self.time {
670            // Auto-advance mode: immediately advance the clock and complete the sleep.
671            self.set(time);
672            // Receiver may have been dropped if the sleep was cancelled.
673            sender.send(()).ok();
674        } else if self.time >= time {
675            // Receiver may have been dropped if the sleep was cancelled.
676            sender.send(()).ok();
677        } else {
678            self.sleeps.entry(Reverse(time)).or_default().push(sender);
679        }
680        receiver
681    }
682}
683
684/// A clock implementation that uses a stored number of microseconds and that can be updated
685/// explicitly. All clones share the same time, and setting it in one clone updates all the others.
686#[cfg(with_testing)]
687#[derive(Clone, Default)]
688pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
689
690#[cfg(with_testing)]
691#[cfg_attr(not(web), async_trait)]
692#[cfg_attr(web, async_trait(?Send))]
693impl Clock for TestClock {
694    fn current_time(&self) -> Timestamp {
695        self.lock().time
696    }
697
698    async fn sleep_until(&self, timestamp: Timestamp) {
699        let receiver = self.lock().add_sleep_until(timestamp);
700        // Sender may have been dropped if the clock was dropped; just stop waiting.
701        receiver.await.ok();
702    }
703}
704
705#[cfg(with_testing)]
706impl TestClock {
707    /// Creates a new clock with its time set to 0, i.e. the Unix epoch.
708    pub fn new() -> Self {
709        TestClock(Arc::default())
710    }
711
712    /// Sets the current time.
713    pub fn set(&self, time: Timestamp) {
714        self.lock().set(time);
715    }
716
717    /// Advances the current time by the specified delta.
718    pub fn add(&self, delta: TimeDelta) {
719        let mut guard = self.lock();
720        let time = guard.time.saturating_add(delta);
721        guard.set(time);
722    }
723
724    /// Returns the current time according to the test clock.
725    pub fn current_time(&self) -> Timestamp {
726        self.lock().time
727    }
728
729    /// Sets a callback that decides whether to auto-advance for each sleep call.
730    ///
731    /// The callback receives the target timestamp and should return `true` if the clock
732    /// should auto-advance to that time, or `false` if the sleep should block normally.
733    #[cfg(with_testing)]
734    pub fn set_sleep_callback<F>(&self, callback: F)
735    where
736        F: Fn(Timestamp) -> bool + Send + Sync + 'static,
737    {
738        self.lock().sleep_callback = Some(Box::new(callback));
739    }
740
741    fn lock(&self) -> std::sync::MutexGuard<'_, TestClockInner> {
742        self.0.lock().expect("poisoned TestClock mutex")
743    }
744}
745
746#[cfg_attr(not(web), async_trait)]
747#[cfg_attr(web, async_trait(?Send))]
748impl<Database, C> Storage for DbStorage<Database, C>
749where
750    Database: KeyValueDatabase<
751            Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
752            Error: Send + Sync,
753        > + Clone
754        + linera_base::util::traits::AutoTraits
755        + 'static,
756    C: Clock + Clone + Send + Sync + 'static,
757{
758    type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
759    type Clock = C;
760    type BlockExporterContext = ViewContext<u32, Database::Store>;
761
762    fn clock(&self) -> &C {
763        &self.clock
764    }
765
766    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
767        &self.thread_pool
768    }
769
770    fn shared_committees(&self) -> &SharedCommittees {
771        &self.shared_committees
772    }
773
774    #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
775    async fn load_chain(
776        &self,
777        chain_id: ChainId,
778    ) -> Result<ChainStateView<Self::Context>, ViewError> {
779        #[cfg(with_metrics)]
780        let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
781        let runtime_context = ChainRuntimeContext {
782            storage: self.clone(),
783            thread_pool: self.thread_pool.clone(),
784            chain_id,
785            execution_runtime_config: self.execution_runtime_config,
786            user_contracts: self.user_contracts.clone(),
787            user_services: self.user_services.clone(),
788        };
789        let root_key = RootKey::ChainState(chain_id).bytes();
790        let store = self.database.open_exclusive(&root_key)?;
791        let context = ViewContext::create_root_context(store, runtime_context).await?;
792        ChainStateView::load(context).await
793    }
794
795    #[instrument(level = "trace", skip_all, fields(%blob_id))]
796    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
797        if self.caches.blob.contains(&blob_id) {
798            #[cfg(with_metrics)]
799            metrics::CONTAINS_BLOB_COUNTER
800                .with_label_values(&[metrics::CACHE])
801                .inc();
802            return Ok(true);
803        }
804        let root_key = RootKey::BlobId(blob_id).bytes();
805        let store = self.database.open_shared(&root_key)?;
806        let test = store.contains_key(BLOB_KEY).await?;
807        #[cfg(with_metrics)]
808        metrics::CONTAINS_BLOB_COUNTER
809            .with_label_values(&[metrics::DB])
810            .inc();
811        Ok(test)
812    }
813
814    #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
815    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
816        let mut missing_blobs = Vec::new();
817        #[cfg(with_metrics)]
818        let mut cache_hits: u64 = 0;
819        #[cfg(with_metrics)]
820        let mut db_checks: u64 = 0;
821        for blob_id in blob_ids {
822            if self.caches.blob.contains(blob_id) {
823                #[cfg(with_metrics)]
824                {
825                    cache_hits += 1;
826                }
827                continue;
828            }
829            #[cfg(with_metrics)]
830            {
831                db_checks += 1;
832            }
833            let root_key = RootKey::BlobId(*blob_id).bytes();
834            let store = self.database.open_shared(&root_key)?;
835            if !store.contains_key(BLOB_KEY).await? {
836                missing_blobs.push(*blob_id);
837            }
838        }
839        #[cfg(with_metrics)]
840        {
841            if cache_hits > 0 {
842                metrics::CONTAINS_BLOBS_COUNTER
843                    .with_label_values(&[metrics::CACHE])
844                    .inc_by(cache_hits);
845            }
846            if db_checks > 0 {
847                metrics::CONTAINS_BLOBS_COUNTER
848                    .with_label_values(&[metrics::DB])
849                    .inc_by(db_checks);
850            }
851        }
852        Ok(missing_blobs)
853    }
854
855    #[instrument(skip_all, fields(%blob_id))]
856    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
857        let root_key = RootKey::BlobId(blob_id).bytes();
858        let store = self.database.open_shared(&root_key)?;
859        let test = store.contains_key(BLOB_STATE_KEY).await?;
860        #[cfg(with_metrics)]
861        metrics::CONTAINS_BLOB_STATE_COUNTER
862            .with_label_values(&[metrics::DB])
863            .inc();
864        Ok(test)
865    }
866
867    #[instrument(skip_all, fields(%hash))]
868    async fn read_confirmed_block(
869        &self,
870        hash: CryptoHash,
871    ) -> Result<Option<CacheArc<ConfirmedBlock>>, ViewError> {
872        if let Some(block) = self.caches.confirmed_block.get(&hash) {
873            #[cfg(with_metrics)]
874            metrics::READ_CONFIRMED_BLOCK_COUNTER
875                .with_label_values(&[metrics::CACHE])
876                .inc();
877            return Ok(Some(block));
878        }
879        let root_key = RootKey::BlockHash(hash).bytes();
880        let store = self.database.open_shared(&root_key)?;
881        let value = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await?;
882        #[cfg(with_metrics)]
883        metrics::READ_CONFIRMED_BLOCK_COUNTER
884            .with_label_values(&[metrics::DB])
885            .inc();
886        match value {
887            Some(block) => Ok(Some(self.caches.confirmed_block.insert(&hash, block))),
888            None => Ok(None),
889        }
890    }
891
892    #[instrument(skip_all)]
893    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
894        &self,
895        hashes: I,
896    ) -> Result<Vec<Option<CacheArc<ConfirmedBlock>>>, ViewError> {
897        let hashes = hashes.into_iter().collect::<Vec<_>>();
898        if hashes.is_empty() {
899            return Ok(Vec::new());
900        }
901        let mut results = vec![None; hashes.len()];
902        let mut misses = Vec::new();
903        for (i, hash) in hashes.iter().enumerate() {
904            if let Some(block) = self.caches.confirmed_block.get(hash) {
905                results[i] = Some(block);
906            } else {
907                misses.push(i);
908            }
909        }
910        if !misses.is_empty() {
911            let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
912            let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
913            for (miss_idx, root_key) in misses.iter().zip(root_keys) {
914                let store = self.database.open_shared(&root_key)?;
915                if let Some(block) = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await? {
916                    results[*miss_idx] = Some(
917                        self.caches
918                            .confirmed_block
919                            .insert(&hashes[*miss_idx], block),
920                    );
921                }
922            }
923        }
924        #[cfg(with_metrics)]
925        {
926            let cache_hits = (hashes.len() - misses.len()) as u64;
927            if cache_hits > 0 {
928                metrics::READ_CONFIRMED_BLOCKS_COUNTER
929                    .with_label_values(&[metrics::CACHE])
930                    .inc_by(cache_hits);
931            }
932            let db_reads = misses.len() as u64;
933            if db_reads > 0 {
934                metrics::READ_CONFIRMED_BLOCKS_COUNTER
935                    .with_label_values(&[metrics::DB])
936                    .inc_by(db_reads);
937            }
938        }
939        Ok(results)
940    }
941
942    #[instrument(skip_all, fields(%blob_id))]
943    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<CacheArc<Blob>>, ViewError> {
944        if let Some(blob) = self.caches.blob.get(&blob_id) {
945            #[cfg(with_metrics)]
946            metrics::READ_BLOB_COUNTER
947                .with_label_values(&[metrics::CACHE])
948                .inc();
949            return Ok(Some(blob));
950        }
951        let root_key = RootKey::BlobId(blob_id).bytes();
952        let store = self.database.open_shared(&root_key)?;
953        let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
954        #[cfg(with_metrics)]
955        metrics::READ_BLOB_COUNTER
956            .with_label_values(&[metrics::DB])
957            .inc();
958        match maybe_blob_bytes {
959            Some(blob_bytes) => {
960                let blob = Blob::new_with_id_unchecked(blob_id, blob_bytes);
961                Ok(Some(self.caches.blob.insert(&blob_id, blob)))
962            }
963            None => Ok(None),
964        }
965    }
966
967    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
968    async fn read_blobs(
969        &self,
970        blob_ids: &[BlobId],
971    ) -> Result<Vec<Option<CacheArc<Blob>>>, ViewError> {
972        if blob_ids.is_empty() {
973            return Ok(Vec::new());
974        }
975        // Each blob lives under its own root_key (partition), so cross-partition
976        // reads can't be coalesced into a single IN query. The ScyllaDB best
977        // practice is parallel queries via the shard-aware driver, which routes
978        // each query to the right shard on the right node. RocksDB benefits too:
979        // concurrent point lookups let the scheduler overlap cache/SST reads.
980        futures::future::try_join_all(blob_ids.iter().map(|blob_id| self.read_blob(*blob_id))).await
981    }
982
983    #[instrument(skip_all, fields(%blob_id))]
984    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
985        let root_key = RootKey::BlobId(blob_id).bytes();
986        let store = self.database.open_shared(&root_key)?;
987        let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
988        #[cfg(with_metrics)]
989        metrics::READ_BLOB_STATE_COUNTER
990            .with_label_values(&[metrics::DB])
991            .inc();
992        Ok(blob_state)
993    }
994
995    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
996    async fn read_blob_states(
997        &self,
998        blob_ids: &[BlobId],
999    ) -> Result<Vec<Option<BlobState>>, ViewError> {
1000        if blob_ids.is_empty() {
1001            return Ok(Vec::new());
1002        }
1003        futures::future::try_join_all(
1004            blob_ids
1005                .iter()
1006                .map(|blob_id| self.read_blob_state(*blob_id)),
1007        )
1008        .await
1009    }
1010
1011    #[instrument(skip_all, fields(blob_id = %blob.id()))]
1012    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
1013        let mut batch = MultiPartitionBatch::new();
1014        batch.add_blob(blob);
1015        self.write_batch(batch).await?;
1016        Ok(())
1017    }
1018
1019    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1020    async fn maybe_write_blob_states(
1021        &self,
1022        blob_ids: &[BlobId],
1023        blob_state: BlobState,
1024    ) -> Result<(), ViewError> {
1025        if blob_ids.is_empty() {
1026            return Ok(());
1027        }
1028        let mut maybe_blob_states = Vec::new();
1029        for blob_id in blob_ids {
1030            let root_key = RootKey::BlobId(*blob_id).bytes();
1031            let store = self.database.open_shared(&root_key)?;
1032            let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1033            maybe_blob_states.push(maybe_blob_state);
1034        }
1035        let mut batch = MultiPartitionBatch::new();
1036        for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
1037            match maybe_blob_state {
1038                None => {
1039                    batch.add_blob_state(*blob_id, &blob_state)?;
1040                }
1041                Some(state) => {
1042                    if state.epoch < blob_state.epoch {
1043                        batch.add_blob_state(*blob_id, &blob_state)?;
1044                    }
1045                }
1046            }
1047        }
1048        // We tolerate race conditions because two active chains are likely to
1049        // be both from the latest epoch, and otherwise failing to pick the
1050        // more recent blob state has limited impact.
1051        self.write_batch(batch).await?;
1052        Ok(())
1053    }
1054
1055    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1056    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
1057        if blobs.is_empty() {
1058            return Ok(Vec::new());
1059        }
1060        let mut batch = MultiPartitionBatch::new();
1061        let mut blob_states = Vec::new();
1062        for blob in blobs {
1063            let root_key = RootKey::BlobId(blob.id()).bytes();
1064            let store = self.database.open_shared(&root_key)?;
1065            let has_state = store.contains_key(BLOB_STATE_KEY).await?;
1066            blob_states.push(has_state);
1067            if has_state {
1068                batch.add_blob(blob);
1069            }
1070        }
1071        self.write_batch(batch).await?;
1072        Ok(blob_states)
1073    }
1074
1075    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1076    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
1077        if blobs.is_empty() {
1078            return Ok(());
1079        }
1080        let mut batch = MultiPartitionBatch::new();
1081        for blob in blobs {
1082            batch.add_blob(blob);
1083        }
1084        self.write_batch(batch).await
1085    }
1086
1087    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1088    async fn write_blobs_and_certificate(
1089        &self,
1090        blobs: &[Blob],
1091        certificate: &ConfirmedBlockCertificate,
1092    ) -> Result<(), ViewError> {
1093        let mut batch = MultiPartitionBatch::new();
1094        for blob in blobs {
1095            batch.add_blob(blob);
1096        }
1097        batch.add_certificate(certificate)?;
1098        self.write_batch(batch).await?;
1099        // Populate immutable-data caches so subsequent reads are served from memory.
1100        let block = certificate.value().block();
1101        let chain_id = block.header.chain_id;
1102        let height = block.header.height;
1103        let hash = certificate.hash();
1104        self.caches
1105            .block_hash_by_height
1106            .insert(&(chain_id, height), hash);
1107        for event in block.body.events.iter().flatten() {
1108            let event_id = EventId {
1109                chain_id,
1110                stream_id: event.stream_id.clone(),
1111                index: event.index,
1112            };
1113            self.caches.event_block_height.insert(&event_id, height);
1114        }
1115        Ok(())
1116    }
1117
1118    fn cache_certificate(
1119        &self,
1120        certificate: ConfirmedBlockCertificate,
1121    ) -> CacheArc<ConfirmedBlockCertificate> {
1122        self.caches
1123            .certificate
1124            .insert(&certificate.hash(), certificate)
1125    }
1126
1127    fn cache_blob(&self, blob: Blob) -> CacheArc<Blob> {
1128        self.caches.blob.insert(&blob.id(), blob)
1129    }
1130
1131    fn cache_confirmed_block(&self, block: ConfirmedBlock) -> CacheArc<ConfirmedBlock> {
1132        self.caches.confirmed_block.insert(&block.hash(), block)
1133    }
1134
1135    #[instrument(skip_all, fields(%hash))]
1136    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
1137        if self.caches.certificate.contains(&hash) || self.caches.certificate_raw.contains(&hash) {
1138            #[cfg(with_metrics)]
1139            metrics::CONTAINS_CERTIFICATE_COUNTER
1140                .with_label_values(&[metrics::CACHE])
1141                .inc();
1142            return Ok(true);
1143        }
1144        let root_key = RootKey::BlockHash(hash).bytes();
1145        let store = self.database.open_shared(&root_key)?;
1146        let results = store.contains_keys(&get_block_keys()).await?;
1147        #[cfg(with_metrics)]
1148        metrics::CONTAINS_CERTIFICATE_COUNTER
1149            .with_label_values(&[metrics::DB])
1150            .inc();
1151        Ok(results[0] && results[1])
1152    }
1153
1154    #[instrument(skip_all, fields(%hash))]
1155    async fn read_certificate(
1156        &self,
1157        hash: CryptoHash,
1158    ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, ViewError> {
1159        // Assembled certificate cache (single Arc, no re-assembly)
1160        if let Some(cert) = self.caches.certificate.get(&hash) {
1161            #[cfg(with_metrics)]
1162            metrics::READ_CERTIFICATE_COUNTER
1163                .with_label_values(&[metrics::CACHE])
1164                .inc();
1165            return Ok(Some(cert));
1166        }
1167        // Raw bytes cache — deserialize + populate caches
1168        if let Some(raw) = self.caches.certificate_raw.get(&hash) {
1169            #[cfg(with_metrics)]
1170            metrics::READ_CERTIFICATE_COUNTER
1171                .with_label_values(&[metrics::CACHE])
1172                .inc();
1173            return self.deserialize_and_cache_certificate(&raw.0, &raw.1);
1174        }
1175        // DB
1176        let root_key = RootKey::BlockHash(hash).bytes();
1177        let store = self.database.open_shared(&root_key)?;
1178        let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1179        #[cfg(with_metrics)]
1180        metrics::READ_CERTIFICATE_COUNTER
1181            .with_label_values(&[metrics::DB])
1182            .inc();
1183        let Some(lite_cert_bytes) = values[0].as_ref() else {
1184            return Ok(None);
1185        };
1186        let Some(confirmed_block_bytes) = values[1].as_ref() else {
1187            return Ok(None);
1188        };
1189        self.caches.certificate_raw.insert(
1190            &hash,
1191            (lite_cert_bytes.clone(), confirmed_block_bytes.clone()),
1192        );
1193        self.deserialize_and_cache_certificate(lite_cert_bytes, confirmed_block_bytes)
1194    }
1195
1196    #[instrument(skip_all)]
1197    async fn read_certificates(
1198        &self,
1199        hashes: &[CryptoHash],
1200    ) -> Result<Vec<Option<CacheArc<ConfirmedBlockCertificate>>>, ViewError> {
1201        let raw_certs = self.read_certificates_raw(hashes).await?;
1202
1203        raw_certs
1204            .into_iter()
1205            .map(|maybe_raw| {
1206                let Some(raw) = maybe_raw else {
1207                    return Ok(None);
1208                };
1209                self.deserialize_and_cache_certificate(&raw.0, &raw.1)
1210            })
1211            .collect()
1212    }
1213
1214    #[instrument(skip_all)]
1215    async fn read_certificates_raw(
1216        &self,
1217        hashes: &[CryptoHash],
1218    ) -> Result<Vec<Option<CacheArc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1219        if hashes.is_empty() {
1220            return Ok(Vec::new());
1221        }
1222        let mut results = vec![None; hashes.len()];
1223        let mut misses = Vec::new();
1224        for (i, hash) in hashes.iter().enumerate() {
1225            if let Some(raw) = self.caches.certificate_raw.get(hash) {
1226                results[i] = Some(raw);
1227            } else {
1228                misses.push(i);
1229            }
1230        }
1231        if !misses.is_empty() {
1232            let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
1233            let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
1234            for (miss_idx, root_key) in misses.iter().zip(root_keys) {
1235                let store = self.database.open_shared(&root_key)?;
1236                let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1237                if let (Some(lite), Some(block)) = (values[0].as_ref(), values[1].as_ref()) {
1238                    results[*miss_idx] = Some(
1239                        self.caches
1240                            .certificate_raw
1241                            .insert(&hashes[*miss_idx], (lite.clone(), block.clone())),
1242                    );
1243                }
1244            }
1245        }
1246        #[cfg(with_metrics)]
1247        {
1248            let cache_hits = (hashes.len() - misses.len()) as u64;
1249            if cache_hits > 0 {
1250                metrics::READ_CERTIFICATES_COUNTER
1251                    .with_label_values(&[metrics::CACHE])
1252                    .inc_by(cache_hits);
1253            }
1254            let db_reads = misses.len() as u64;
1255            if db_reads > 0 {
1256                metrics::READ_CERTIFICATES_COUNTER
1257                    .with_label_values(&[metrics::DB])
1258                    .inc_by(db_reads);
1259            }
1260        }
1261        Ok(results)
1262    }
1263
1264    async fn read_certificate_hashes_by_heights(
1265        &self,
1266        chain_id: ChainId,
1267        heights: &[BlockHeight],
1268    ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
1269        if heights.is_empty() {
1270            return Ok(Vec::new());
1271        }
1272
1273        let mut results = vec![None; heights.len()];
1274        let mut misses = Vec::new();
1275        for (i, &height) in heights.iter().enumerate() {
1276            if let Some(hash) = self.caches.block_hash_by_height.get(&(chain_id, height)) {
1277                results[i] = Some(*hash);
1278            } else {
1279                misses.push(i);
1280            }
1281        }
1282        #[cfg(with_metrics)]
1283        {
1284            let cache_hits = (heights.len() - misses.len()) as u64;
1285            if cache_hits > 0 {
1286                metrics::READ_BLOCK_HASH_BY_HEIGHT_COUNTER
1287                    .with_label_values(&[metrics::CACHE])
1288                    .inc_by(cache_hits);
1289            }
1290        }
1291        if !misses.is_empty() {
1292            let miss_keys: Vec<Vec<u8>> =
1293                misses.iter().map(|&i| to_height_key(heights[i])).collect();
1294            let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1295            let store = self.database.open_shared(&index_root_key)?;
1296            let hash_bytes = store.read_multi_values_bytes(&miss_keys).await?;
1297            #[cfg(with_metrics)]
1298            {
1299                let db_reads = misses.len() as u64;
1300                metrics::READ_BLOCK_HASH_BY_HEIGHT_COUNTER
1301                    .with_label_values(&[metrics::DB])
1302                    .inc_by(db_reads);
1303            }
1304            for (miss_idx, opt_bytes) in misses.iter().zip(hash_bytes) {
1305                if let Some(bytes) = opt_bytes {
1306                    let hash = bcs::from_bytes::<CryptoHash>(&bytes)?;
1307                    self.caches
1308                        .block_hash_by_height
1309                        .insert(&(chain_id, heights[*miss_idx]), hash);
1310                    results[*miss_idx] = Some(hash);
1311                }
1312            }
1313        }
1314
1315        Ok(results)
1316    }
1317
1318    async fn read_event_block_heights(
1319        &self,
1320        event_ids: &[EventId],
1321    ) -> Result<Vec<Option<BlockHeight>>, ViewError> {
1322        if event_ids.is_empty() {
1323            return Ok(Vec::new());
1324        }
1325
1326        let mut results = vec![None; event_ids.len()];
1327        // Check cache first; collect misses.
1328        let mut misses: Vec<usize> = Vec::new();
1329        for (i, event_id) in event_ids.iter().enumerate() {
1330            if let Some(height) = self.caches.event_block_height.get(event_id) {
1331                results[i] = Some(*height);
1332            } else {
1333                misses.push(i);
1334            }
1335        }
1336        #[cfg(with_metrics)]
1337        {
1338            let cache_hits = (event_ids.len() - misses.len()) as u64;
1339            if cache_hits > 0 {
1340                metrics::READ_EVENT_BLOCK_HEIGHT_COUNTER
1341                    .with_label_values(&[metrics::CACHE])
1342                    .inc_by(cache_hits);
1343            }
1344        }
1345        if misses.is_empty() {
1346            return Ok(results);
1347        }
1348        // Group cache-miss event IDs by chain ID for batch lookups per partition.
1349        let mut chain_groups = BTreeMap::<_, Vec<_>>::new();
1350        for &i in &misses {
1351            let event_id = &event_ids[i];
1352            chain_groups
1353                .entry(event_id.chain_id)
1354                .or_default()
1355                .push((i, to_event_key(event_id)));
1356        }
1357        for (chain_id, entries) in chain_groups {
1358            let root_key = RootKey::EventBlockHeight(chain_id).bytes();
1359            let store = self.database.open_shared(&root_key)?;
1360            let keys = entries
1361                .iter()
1362                .map(|(_, key)| key.clone())
1363                .collect::<Vec<_>>();
1364            let values = store.read_multi_values_bytes(&keys).await?;
1365            #[cfg(with_metrics)]
1366            {
1367                let db_reads = entries.len() as u64;
1368                metrics::READ_EVENT_BLOCK_HEIGHT_COUNTER
1369                    .with_label_values(&[metrics::DB])
1370                    .inc_by(db_reads);
1371            }
1372            for ((original_index, _), value) in entries.into_iter().zip(values) {
1373                if let Some(bytes) = value {
1374                    let height = bcs::from_bytes::<BlockHeight>(&bytes)?;
1375                    self.caches
1376                        .event_block_height
1377                        .insert(&event_ids[original_index], height);
1378                    results[original_index] = Some(height);
1379                }
1380            }
1381        }
1382        Ok(results)
1383    }
1384
1385    #[instrument(skip_all)]
1386    async fn read_certificates_by_heights_raw(
1387        &self,
1388        chain_id: ChainId,
1389        heights: &[BlockHeight],
1390    ) -> Result<Vec<Option<CacheArc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1391        let hashes: Vec<Option<CryptoHash>> = self
1392            .read_certificate_hashes_by_heights(chain_id, heights)
1393            .await?;
1394
1395        // Map from hash to all indices in the heights array (handles duplicates)
1396        let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
1397        for (index, maybe_hash) in hashes.iter().enumerate() {
1398            if let Some(hash) = maybe_hash {
1399                indices.entry(*hash).or_default().push(index);
1400            }
1401        }
1402
1403        // Deduplicate hashes for the storage query
1404        let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
1405
1406        let mut result = vec![None; heights.len()];
1407
1408        for (raw_cert, hash) in self
1409            .read_certificates_raw(&unique_hashes)
1410            .await?
1411            .into_iter()
1412            .zip(unique_hashes)
1413        {
1414            if let Some(idx_list) = indices.get(&hash) {
1415                for &index in idx_list {
1416                    result[index] = raw_cert.clone();
1417                }
1418            } else {
1419                // This should not happen, but log a warning if it does.
1420                tracing::error!(?hash, "certificate hash not found in indices map",);
1421            }
1422        }
1423
1424        Ok(result)
1425    }
1426
1427    #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
1428    async fn read_certificates_by_heights(
1429        &self,
1430        chain_id: ChainId,
1431        heights: &[BlockHeight],
1432    ) -> Result<Vec<Option<CacheArc<ConfirmedBlockCertificate>>>, ViewError> {
1433        self.read_certificates_by_heights_raw(chain_id, heights)
1434            .await?
1435            .into_iter()
1436            .map(|maybe_raw| match maybe_raw {
1437                None => Ok(None),
1438                Some(raw) => self.deserialize_and_cache_certificate(&raw.0, &raw.1),
1439            })
1440            .collect()
1441    }
1442
1443    #[instrument(skip_all, fields(event_id = ?event_id))]
1444    async fn read_event(&self, event_id: EventId) -> Result<Option<CacheArc<Vec<u8>>>, ViewError> {
1445        if let Some(event) = self.caches.event.get(&event_id) {
1446            #[cfg(with_metrics)]
1447            metrics::READ_EVENT_COUNTER
1448                .with_label_values(&[metrics::CACHE])
1449                .inc();
1450            return Ok(Some(event));
1451        }
1452        let event_key = to_event_key(&event_id);
1453        let root_key = RootKey::Event(event_id.chain_id).bytes();
1454        let store = self.database.open_shared(&root_key)?;
1455        let event = store.read_value_bytes(&event_key).await?;
1456        #[cfg(with_metrics)]
1457        metrics::READ_EVENT_COUNTER
1458            .with_label_values(&[metrics::DB])
1459            .inc();
1460        match event {
1461            Some(event_bytes) => Ok(Some(self.caches.event.insert(&event_id, event_bytes))),
1462            None => Ok(None),
1463        }
1464    }
1465
1466    #[instrument(skip_all, fields(event_id = ?event_id))]
1467    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1468        if self.caches.event.contains(&event_id) {
1469            #[cfg(with_metrics)]
1470            metrics::CONTAINS_EVENT_COUNTER
1471                .with_label_values(&[metrics::CACHE])
1472                .inc();
1473            return Ok(true);
1474        }
1475        let event_key = to_event_key(&event_id);
1476        let root_key = RootKey::Event(event_id.chain_id).bytes();
1477        let store = self.database.open_shared(&root_key)?;
1478        let exists = store.contains_key(&event_key).await?;
1479        #[cfg(with_metrics)]
1480        metrics::CONTAINS_EVENT_COUNTER
1481            .with_label_values(&[metrics::DB])
1482            .inc();
1483        Ok(exists)
1484    }
1485
1486    #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
1487    async fn read_events_from_index(
1488        &self,
1489        chain_id: &ChainId,
1490        stream_id: &StreamId,
1491        start_index: u32,
1492    ) -> Result<Vec<IndexAndEvent>, ViewError> {
1493        let root_key = RootKey::Event(*chain_id).bytes();
1494        let store = self.database.open_shared(&root_key)?;
1495        // Pair each index with its cached value, or `None` for a cache miss to be
1496        // read from the database, so results keep the key-scan order.
1497        let mut entries = Vec::new();
1498        let mut db_keys = Vec::new();
1499        let prefix = bcs::to_bytes(stream_id).unwrap();
1500        for short_key in store.find_keys_by_prefix(&prefix).await? {
1501            let index = bcs::from_bytes::<u32>(&short_key)?;
1502            if index >= start_index {
1503                let event_id = EventId {
1504                    chain_id: *chain_id,
1505                    stream_id: stream_id.clone(),
1506                    index,
1507                };
1508                let cached = self.caches.event.get(&event_id).map(|arc| (*arc).clone());
1509                if cached.is_none() {
1510                    let mut key = prefix.clone();
1511                    key.extend(short_key);
1512                    db_keys.push(key);
1513                }
1514                entries.push((index, cached));
1515            }
1516        }
1517        let mut db_values = if db_keys.is_empty() {
1518            Vec::new()
1519        } else {
1520            store.read_multi_values_bytes(&db_keys).await?
1521        }
1522        .into_iter();
1523        let mut returned_values = Vec::with_capacity(entries.len());
1524        for (index, cached) in entries {
1525            let event = match cached {
1526                Some(event) => event,
1527                None => {
1528                    let event_bytes = db_values
1529                        .next()
1530                        .expect("one database value per cache miss")
1531                        .unwrap();
1532                    let event_id = EventId {
1533                        chain_id: *chain_id,
1534                        stream_id: stream_id.clone(),
1535                        index,
1536                    };
1537                    self.caches.event.insert(&event_id, event_bytes.clone());
1538                    event_bytes
1539                }
1540            };
1541            returned_values.push(IndexAndEvent { index, event });
1542        }
1543        Ok(returned_values)
1544    }
1545
1546    #[instrument(skip_all)]
1547    async fn write_events(
1548        &self,
1549        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1550    ) -> Result<(), ViewError> {
1551        let mut batch = MultiPartitionBatch::new();
1552        for (event_id, value) in events {
1553            batch.add_event(&event_id, value);
1554        }
1555        self.write_batch(batch).await
1556    }
1557
1558    #[instrument(skip_all)]
1559    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1560        if let Some(desc) = self.caches.network_description.get() {
1561            #[cfg(with_metrics)]
1562            metrics::READ_NETWORK_DESCRIPTION
1563                .with_label_values(&[metrics::CACHE])
1564                .inc();
1565            return Ok(Some(desc.clone()));
1566        }
1567        let root_key = RootKey::NetworkDescription.bytes();
1568        let store = self.database.open_shared(&root_key)?;
1569        let maybe_value: Option<NetworkDescription> =
1570            store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1571        #[cfg(with_metrics)]
1572        metrics::READ_NETWORK_DESCRIPTION
1573            .with_label_values(&[metrics::DB])
1574            .inc();
1575        if let Some(ref desc) = maybe_value {
1576            if self.caches.network_description.set(desc.clone()).is_err() {
1577                debug!("network description cache was already populated concurrently");
1578            }
1579        }
1580        Ok(maybe_value)
1581    }
1582
1583    #[instrument(skip_all)]
1584    async fn write_network_description(
1585        &self,
1586        information: &NetworkDescription,
1587    ) -> Result<(), ViewError> {
1588        let mut batch = MultiPartitionBatch::new();
1589        batch.add_network_description(information)?;
1590        self.write_batch(batch).await?;
1591        Ok(())
1592    }
1593
1594    fn wasm_runtime(&self) -> Option<WasmRuntime> {
1595        self.wasm_runtime
1596    }
1597
1598    #[instrument(skip_all)]
1599    async fn block_exporter_context(
1600        &self,
1601        block_exporter_id: u32,
1602    ) -> Result<Self::BlockExporterContext, ViewError> {
1603        let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1604        let store = self.database.open_exclusive(&root_key)?;
1605        Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1606    }
1607
1608    async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError> {
1609        let root_keys = self.database.list_root_keys().await?;
1610        let mut blob_ids = Vec::new();
1611        for root_key in root_keys {
1612            if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1613                let root_key_red = &root_key[1..];
1614                let blob_id = bcs::from_bytes(root_key_red)?;
1615                blob_ids.push(blob_id);
1616            }
1617        }
1618        Ok(blob_ids)
1619    }
1620
1621    async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError> {
1622        let root_keys = self.database.list_root_keys().await?;
1623        let mut chain_ids = Vec::new();
1624        for root_key in root_keys {
1625            if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1626                let root_key_red = &root_key[1..];
1627                let chain_id = bcs::from_bytes(root_key_red)?;
1628                chain_ids.push(chain_id);
1629            }
1630        }
1631        Ok(chain_ids)
1632    }
1633
1634    async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError> {
1635        let root_keys = self.database.list_root_keys().await?;
1636        let mut event_ids = Vec::new();
1637        for root_key in root_keys {
1638            if !root_key.is_empty() && root_key[0] == EVENT_ID_TAG {
1639                let root_key_red = &root_key[1..];
1640                let chain_id = bcs::from_bytes(root_key_red)?;
1641                let store = self.database.open_shared(&root_key)?;
1642                let keys = store.find_keys_by_prefix(&[]).await?;
1643                for key in keys {
1644                    let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
1645                    let event_id = EventId {
1646                        chain_id,
1647                        stream_id: restricted_event_id.stream_id,
1648                        index: restricted_event_id.index,
1649                    };
1650                    event_ids.push(event_id);
1651                }
1652            }
1653        }
1654        Ok(event_ids)
1655    }
1656}
1657
1658impl<Database, C> DbStorage<Database, C>
1659where
1660    Database: KeyValueDatabase + Clone,
1661    Database::Store: KeyValueStore + Clone,
1662    C: Clock,
1663    Database::Error: Send + Sync,
1664{
1665    #[instrument(skip_all)]
1666    fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1667        hashes
1668            .iter()
1669            .map(|hash| RootKey::BlockHash(*hash).bytes())
1670            .collect()
1671    }
1672
1673    fn deserialize_and_cache_certificate(
1674        &self,
1675        lite_cert_bytes: &[u8],
1676        confirmed_block_bytes: &[u8],
1677    ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, ViewError> {
1678        let lite = bcs::from_bytes::<LiteCertificate>(lite_cert_bytes)?;
1679        let block = bcs::from_bytes::<ConfirmedBlock>(confirmed_block_bytes)?;
1680        let hash = block.hash();
1681        self.caches.confirmed_block.insert(&hash, block.clone());
1682        let certificate = lite
1683            .with_value(block)
1684            .ok_or(ViewError::InconsistentEntries)?;
1685        let arc = self.caches.certificate.insert(&hash, certificate);
1686        Ok(Some(arc))
1687    }
1688
1689    #[instrument(skip_all)]
1690    async fn write_entry(
1691        store: &Database::Store,
1692        key_values: Vec<(Vec<u8>, Vec<u8>)>,
1693    ) -> Result<(), ViewError> {
1694        let mut batch = Batch::new();
1695        for (key, value) in key_values {
1696            batch.put_key_value_bytes(key, value);
1697        }
1698        store.write_batch(batch).await?;
1699        Ok(())
1700    }
1701
1702    #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1703    async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1704        if batch.keys_value_bytes.is_empty() {
1705            return Ok(());
1706        }
1707        let mut futures = Vec::new();
1708        for (root_key, key_values) in batch.keys_value_bytes {
1709            let store = self.database.open_shared(&root_key)?;
1710            futures.push(async move { Self::write_entry(&store, key_values).await });
1711        }
1712        futures::future::try_join_all(futures).await?;
1713        Ok(())
1714    }
1715}
1716
1717impl<Database, C> DbStorage<Database, C> {
1718    fn new(
1719        database: Database,
1720        wasm_runtime: Option<WasmRuntime>,
1721        cache_sizes: StorageCacheConfig,
1722        clock: C,
1723    ) -> Self {
1724        Self {
1725            database: Arc::new(database),
1726            clock,
1727            // The `Arc` here is required on native but useless on the Web.
1728            #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1729            thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1730            wasm_runtime,
1731            user_contracts: Arc::new(papaya::HashMap::new()),
1732            user_services: Arc::new(papaya::HashMap::new()),
1733            shared_committees: SharedCommittees::new(),
1734            caches: StorageCaches::new(cache_sizes),
1735            execution_runtime_config: ExecutionRuntimeConfig::default(),
1736        }
1737    }
1738
1739    /// Sets whether contract log messages should be output.
1740    pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1741        self.execution_runtime_config.allow_application_logs = allow;
1742        self
1743    }
1744}
1745
1746impl<Database> DbStorage<Database, WallClock>
1747where
1748    Database: KeyValueDatabase + Clone + 'static,
1749    Database::Error: Send + Sync,
1750    Database::Store: KeyValueStore + Clone + 'static,
1751{
1752    /// Connects to the storage in the given namespace, creating it if it does not exist.
1753    pub async fn maybe_create_and_connect(
1754        config: &Database::Config,
1755        namespace: &str,
1756        wasm_runtime: Option<WasmRuntime>,
1757        cache_sizes: StorageCacheConfig,
1758    ) -> Result<Self, Database::Error> {
1759        let database = Database::maybe_create_and_connect(config, namespace).await?;
1760        Ok(Self::new(database, wasm_runtime, cache_sizes, WallClock))
1761    }
1762
1763    /// Connects to the existing storage in the given namespace.
1764    pub async fn connect(
1765        config: &Database::Config,
1766        namespace: &str,
1767        wasm_runtime: Option<WasmRuntime>,
1768        cache_sizes: StorageCacheConfig,
1769    ) -> Result<Self, Database::Error> {
1770        let database = Database::connect(config, namespace).await?;
1771        Ok(Self::new(database, wasm_runtime, cache_sizes, WallClock))
1772    }
1773}
1774
1775#[cfg(with_testing)]
1776impl<Database, C> DbStorage<Database, C>
1777where
1778    Database: linera_views::backends::DatabaseBackup,
1779{
1780    /// Backs up the underlying database to the given directory.
1781    pub fn backup_to(&self, dir: &std::path::Path) -> anyhow::Result<()> {
1782        self.database.backup_to(dir)
1783    }
1784}
1785
1786#[cfg(with_testing)]
1787impl<Database> DbStorage<Database, TestClock>
1788where
1789    Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1790    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1791    Database::Error: Send + Sync,
1792{
1793    /// Creates a test storage in a fresh random namespace with a `TestClock`.
1794    pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1795        let config = Database::new_test_config().await.unwrap();
1796        let namespace = generate_test_namespace();
1797        DbStorage::<Database, TestClock>::new_for_testing(
1798            config,
1799            &namespace,
1800            wasm_runtime,
1801            TestClock::new(),
1802        )
1803        .await
1804        .unwrap()
1805    }
1806
1807    /// Recreates the storage in the given namespace and connects to it, for testing.
1808    pub async fn new_for_testing(
1809        config: Database::Config,
1810        namespace: &str,
1811        wasm_runtime: Option<WasmRuntime>,
1812        clock: TestClock,
1813    ) -> Result<Self, Database::Error> {
1814        let database = Database::recreate_and_connect(&config, namespace).await?;
1815        Ok(Self::new(
1816            database,
1817            wasm_runtime,
1818            DEFAULT_STORAGE_CACHE_CONFIG,
1819            clock,
1820        ))
1821    }
1822
1823    /// Connects to the existing storage in the given namespace, for testing.
1824    pub async fn connect_for_testing(
1825        config: Database::Config,
1826        namespace: &str,
1827        wasm_runtime: Option<WasmRuntime>,
1828        clock: TestClock,
1829    ) -> Result<Self, Database::Error> {
1830        let database = Database::connect(&config, namespace).await?;
1831        Ok(Self::new(
1832            database,
1833            wasm_runtime,
1834            DEFAULT_STORAGE_CACHE_CONFIG,
1835            clock,
1836        ))
1837    }
1838}
1839
1840#[cfg(test)]
1841mod tests {
1842    use linera_base::{
1843        crypto::{CryptoHash, TestString},
1844        data_types::{BlockHeight, Epoch, Round, Timestamp},
1845        identifiers::{
1846            ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
1847            StreamName,
1848        },
1849    };
1850    use linera_chain::{
1851        block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
1852        types::ConfirmedBlockCertificate,
1853    };
1854    use linera_views::{
1855        memory::MemoryDatabase,
1856        store::{KeyValueDatabase, ReadableKeyValueStore as _},
1857    };
1858
1859    use crate::{
1860        db_storage::{
1861            to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
1862            EVENT_ID_TAG,
1863        },
1864        DbStorage, Storage, TestClock,
1865    };
1866
1867    // Several functionalities of the storage rely on the way that the serialization
1868    // is done. Thus we need to check that the serialization works in the way that
1869    // we expect.
1870
1871    // The listing of the blobs in `list_blob_ids` depends on the serialization
1872    // of `RootKey::Blob`.
1873    #[test]
1874    fn test_root_key_blob_serialization() {
1875        let hash = CryptoHash::default();
1876        let blob_type = BlobType::default();
1877        let blob_id = BlobId::new(hash, blob_type);
1878        let root_key = RootKey::BlobId(blob_id).bytes();
1879        assert_eq!(root_key[0], BLOB_ID_TAG);
1880        assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
1881    }
1882
1883    // The listing of the chains in `list_chain_ids` depends on the serialization
1884    // of `RootKey::ChainState`.
1885    #[test]
1886    fn test_root_key_chainstate_serialization() {
1887        let hash = CryptoHash::default();
1888        let chain_id = ChainId(hash);
1889        let root_key = RootKey::ChainState(chain_id).bytes();
1890        assert_eq!(root_key[0], CHAIN_ID_TAG);
1891        assert_eq!(
1892            bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
1893            chain_id
1894        );
1895    }
1896
1897    // The listing of the events in `read_events_from_index` depends on the
1898    // serialization of `RootKey::Event`.
1899    #[test]
1900    fn test_root_key_event_serialization() {
1901        let hash = CryptoHash::test_hash("49");
1902        let chain_id = ChainId(hash);
1903        let application_description_hash = CryptoHash::test_hash("42");
1904        let application_id = ApplicationId::new(application_description_hash);
1905        let application_id = GenericApplicationId::User(application_id);
1906        let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
1907        let stream_id = StreamId {
1908            application_id,
1909            stream_name,
1910        };
1911        let prefix = bcs::to_bytes(&stream_id).unwrap();
1912
1913        let index = 1567;
1914        let event_id = EventId {
1915            chain_id,
1916            stream_id,
1917            index,
1918        };
1919        let root_key = RootKey::Event(chain_id).bytes();
1920        assert_eq!(root_key[0], EVENT_ID_TAG);
1921        let key = to_event_key(&event_id);
1922        assert!(key.starts_with(&prefix));
1923    }
1924
1925    // The height index lookup depends on the serialization of RootKey::BlockByHeight
1926    // and to_height_key, following the same pattern as Event.
1927    #[test]
1928    fn test_root_key_block_by_height_serialization() {
1929        use linera_base::data_types::BlockHeight;
1930
1931        let hash = CryptoHash::default();
1932        let chain_id = ChainId(hash);
1933        let height = BlockHeight(42);
1934
1935        // RootKey::BlockByHeight uses only ChainId for partitioning (like Event)
1936        let root_key = RootKey::BlockByHeight(chain_id).bytes();
1937        let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
1938        assert_eq!(deserialized_chain_id, chain_id);
1939
1940        // Height is encoded as a key (like index in Event)
1941        let height_key = to_height_key(height);
1942        let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
1943        assert_eq!(deserialized_height, height);
1944    }
1945
1946    #[cfg(with_testing)]
1947    #[tokio::test]
1948    async fn test_add_certificate_creates_height_index() {
1949        // Create test storage
1950        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1951
1952        // Create a test certificate at a specific height
1953        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1954        let height = BlockHeight(5);
1955        let block = Block {
1956            header: BlockHeader {
1957                chain_id,
1958                epoch: Epoch::ZERO,
1959                height,
1960                timestamp: Timestamp::from(0),
1961                state_hash: CryptoHash::new(&TestString::new("state_hash")),
1962                previous_block_hash: None,
1963                authenticated_owner: None,
1964                transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
1965                messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
1966                previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1967                    "prev_msg_blocks_hash",
1968                )),
1969                previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1970                    "prev_event_blocks_hash",
1971                )),
1972                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
1973                events_hash: CryptoHash::new(&TestString::new("events_hash")),
1974                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1975                operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
1976            },
1977            body: BlockBody {
1978                transactions: vec![],
1979                messages: vec![],
1980                previous_message_blocks: Default::default(),
1981                previous_event_blocks: Default::default(),
1982                oracle_responses: vec![],
1983                events: vec![],
1984                blobs: vec![],
1985                operation_results: vec![],
1986            },
1987        };
1988        let confirmed_block = ConfirmedBlock::new(block);
1989        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1990
1991        // Write certificate
1992        let mut batch = MultiPartitionBatch::new();
1993        batch.add_certificate(&certificate).unwrap();
1994        storage.write_batch(batch).await.unwrap();
1995
1996        // Verify height index was created (following Event pattern)
1997        let hash = certificate.hash();
1998        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1999        let store = storage.database.open_shared(&index_root_key).unwrap();
2000        let height_key = to_height_key(height);
2001        let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
2002
2003        assert!(value_bytes.is_some(), "Height index was not created");
2004        let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
2005        assert_eq!(stored_hash, hash, "Height index contains wrong hash");
2006    }
2007
2008    #[cfg(with_testing)]
2009    #[tokio::test]
2010    async fn test_read_certificates_by_heights() {
2011        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
2012        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
2013
2014        // Write certificates at heights 1, 3, 5
2015        let mut batch = MultiPartitionBatch::new();
2016        let mut expected_certs = vec![];
2017
2018        for height in [1, 3, 5] {
2019            let block = Block {
2020                header: BlockHeader {
2021                    chain_id,
2022                    epoch: Epoch::ZERO,
2023                    height: BlockHeight(height),
2024                    timestamp: Timestamp::from(0),
2025                    state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
2026                    previous_block_hash: None,
2027                    authenticated_owner: None,
2028                    transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
2029                    messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
2030                    previous_message_blocks_hash: CryptoHash::new(&TestString::new(
2031                        "pmb_hash_{height}",
2032                    )),
2033                    previous_event_blocks_hash: CryptoHash::new(&TestString::new(
2034                        "peb_hash_{height}",
2035                    )),
2036                    oracle_responses_hash: CryptoHash::new(&TestString::new(
2037                        "oracle_hash_{height}",
2038                    )),
2039                    events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
2040                    blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
2041                    operation_results_hash: CryptoHash::new(&TestString::new(
2042                        "op_results_hash_{height}",
2043                    )),
2044                },
2045                body: BlockBody {
2046                    transactions: vec![],
2047                    messages: vec![],
2048                    previous_message_blocks: Default::default(),
2049                    previous_event_blocks: Default::default(),
2050                    oracle_responses: vec![],
2051                    events: vec![],
2052                    blobs: vec![],
2053                    operation_results: vec![],
2054                },
2055            };
2056            let confirmed_block = ConfirmedBlock::new(block);
2057            let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
2058            expected_certs.push((height, cert.clone()));
2059            batch.add_certificate(&cert).unwrap();
2060        }
2061        storage.write_batch(batch).await.unwrap();
2062
2063        // Test: Read in order [1, 3, 5]
2064        let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
2065        let result = storage
2066            .read_certificates_by_heights(chain_id, &heights)
2067            .await
2068            .unwrap();
2069        assert_eq!(result.len(), 3);
2070        assert_eq!(
2071            result[0].as_ref().unwrap().hash(),
2072            expected_certs[0].1.hash()
2073        );
2074        assert_eq!(
2075            result[1].as_ref().unwrap().hash(),
2076            expected_certs[1].1.hash()
2077        );
2078        assert_eq!(
2079            result[2].as_ref().unwrap().hash(),
2080            expected_certs[2].1.hash()
2081        );
2082
2083        // Test: Read out of order [5, 1, 3]
2084        let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
2085        let result = storage
2086            .read_certificates_by_heights(chain_id, &heights)
2087            .await
2088            .unwrap();
2089        assert_eq!(result.len(), 3);
2090        assert_eq!(
2091            result[0].as_ref().unwrap().hash(),
2092            expected_certs[2].1.hash()
2093        );
2094        assert_eq!(
2095            result[1].as_ref().unwrap().hash(),
2096            expected_certs[0].1.hash()
2097        );
2098        assert_eq!(
2099            result[2].as_ref().unwrap().hash(),
2100            expected_certs[1].1.hash()
2101        );
2102
2103        // Test: Read with missing heights [1, 2, 3]
2104        let heights = vec![
2105            BlockHeight(1),
2106            BlockHeight(2),
2107            BlockHeight(3),
2108            BlockHeight(3),
2109        ];
2110        let result = storage
2111            .read_certificates_by_heights(chain_id, &heights)
2112            .await
2113            .unwrap();
2114        assert_eq!(result.len(), 4); // BlockHeight(3) was duplicated.
2115        assert!(result[0].is_some());
2116        assert!(result[1].is_none()); // Height 2 doesn't exist
2117        assert!(result[2].is_some());
2118        assert!(result[3].is_some());
2119        assert_eq!(
2120            result[2].as_ref().unwrap().hash(),
2121            result[3].as_ref().unwrap().hash()
2122        ); // Both correspond to height 3
2123
2124        // Test: Empty heights
2125        let heights = vec![];
2126        let result = storage
2127            .read_certificates_by_heights(chain_id, &heights)
2128            .await
2129            .unwrap();
2130        assert_eq!(result.len(), 0);
2131    }
2132
2133    #[cfg(with_testing)]
2134    #[tokio::test]
2135    async fn test_read_certificates_by_heights_multiple_chains() {
2136        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
2137
2138        // Create certificates for two different chains at same heights
2139        let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
2140        let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
2141
2142        let mut batch = MultiPartitionBatch::new();
2143
2144        let block_a = Block {
2145            header: BlockHeader {
2146                chain_id: chain_a,
2147                epoch: Epoch::ZERO,
2148                height: BlockHeight(10),
2149                timestamp: Timestamp::from(0),
2150                state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
2151                previous_block_hash: None,
2152                authenticated_owner: None,
2153                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
2154                messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
2155                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
2156                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
2157                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
2158                events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
2159                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
2160                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
2161            },
2162            body: BlockBody {
2163                transactions: vec![],
2164                messages: vec![],
2165                previous_message_blocks: Default::default(),
2166                previous_event_blocks: Default::default(),
2167                oracle_responses: vec![],
2168                events: vec![],
2169                blobs: vec![],
2170                operation_results: vec![],
2171            },
2172        };
2173        let confirmed_block_a = ConfirmedBlock::new(block_a);
2174        let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
2175        batch.add_certificate(&cert_a).unwrap();
2176
2177        let block_b = Block {
2178            header: BlockHeader {
2179                chain_id: chain_b,
2180                epoch: Epoch::ZERO,
2181                height: BlockHeight(10),
2182                timestamp: Timestamp::from(0),
2183                state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
2184                previous_block_hash: None,
2185                authenticated_owner: None,
2186                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
2187                messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
2188                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
2189                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
2190                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
2191                events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
2192                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
2193                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
2194            },
2195            body: BlockBody {
2196                transactions: vec![],
2197                messages: vec![],
2198                previous_message_blocks: Default::default(),
2199                previous_event_blocks: Default::default(),
2200                oracle_responses: vec![],
2201                events: vec![],
2202                blobs: vec![],
2203                operation_results: vec![],
2204            },
2205        };
2206        let confirmed_block_b = ConfirmedBlock::new(block_b);
2207        let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
2208        batch.add_certificate(&cert_b).unwrap();
2209
2210        storage.write_batch(batch).await.unwrap();
2211
2212        // Read from chain A - should get cert A
2213        let result = storage
2214            .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
2215            .await
2216            .unwrap();
2217        assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
2218
2219        // Read from chain B - should get cert B
2220        let result = storage
2221            .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
2222            .await
2223            .unwrap();
2224        assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
2225
2226        // Read from chain A for height that only chain B has - should get None
2227        let result = storage
2228            .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
2229            .await
2230            .unwrap();
2231        assert!(result[0].is_none());
2232    }
2233
2234    #[cfg(with_testing)]
2235    #[tokio::test]
2236    async fn test_read_certificates_by_heights_consistency() {
2237        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
2238        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
2239
2240        // Write certificate
2241        let mut batch = MultiPartitionBatch::new();
2242        let block = Block {
2243            header: BlockHeader {
2244                chain_id,
2245                epoch: Epoch::ZERO,
2246                height: BlockHeight(7),
2247                timestamp: Timestamp::from(0),
2248                state_hash: CryptoHash::new(&TestString::new("state_hash")),
2249                previous_block_hash: None,
2250                authenticated_owner: None,
2251                transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
2252                messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
2253                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
2254                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
2255                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
2256                events_hash: CryptoHash::new(&TestString::new("events_hash")),
2257                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
2258                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
2259            },
2260            body: BlockBody {
2261                transactions: vec![],
2262                messages: vec![],
2263                previous_message_blocks: Default::default(),
2264                previous_event_blocks: Default::default(),
2265                oracle_responses: vec![],
2266                events: vec![],
2267                blobs: vec![],
2268                operation_results: vec![],
2269            },
2270        };
2271        let confirmed_block = ConfirmedBlock::new(block);
2272        let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
2273        let hash = cert.hash();
2274        batch.add_certificate(&cert).unwrap();
2275        storage.write_batch(batch).await.unwrap();
2276
2277        // Read by hash
2278        let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
2279
2280        // Read by height
2281        let certs_by_height = storage
2282            .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
2283            .await
2284            .unwrap();
2285        let cert_by_height = certs_by_height[0].as_ref().unwrap();
2286
2287        // Should be identical
2288        assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
2289        assert_eq!(
2290            cert_by_hash.value().block().header,
2291            cert_by_height.value().block().header
2292        );
2293    }
2294}