linera_storage/
db_storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap},
6    fmt::Debug,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
16    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
17};
18use linera_chain::{
19    types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
20    ChainStateView,
21};
22use linera_execution::{
23    BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
24};
25use linera_views::{
26    backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
27    batch::Batch,
28    context::ViewContext,
29    store::{
30        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
31    },
32    views::View,
33    ViewError,
34};
35use serde::{Deserialize, Serialize};
36use tracing::instrument;
37#[cfg(with_testing)]
38use {
39    futures::channel::oneshot::{self, Receiver},
40    linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
41    std::cmp::Reverse,
42};
43
44use crate::{ChainRuntimeContext, Clock, Storage};
45
46#[cfg(with_metrics)]
47pub mod metrics {
48    use std::sync::LazyLock;
49
50    use linera_base::prometheus_util::{
51        exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
52    };
53    use prometheus::{HistogramVec, IntCounterVec};
54
55    /// The metric counting how often a blob is tested for existence from storage
56    pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
57        register_int_counter_vec(
58            "contains_blob",
59            "The metric counting how often a blob is tested for existence from storage",
60            &[],
61        )
62    });
63
64    /// The metric counting how often multiple blobs are tested for existence from storage
65    pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
66        register_int_counter_vec(
67            "contains_blobs",
68            "The metric counting how often multiple blobs are tested for existence from storage",
69            &[],
70        )
71    });
72
73    /// The metric counting how often a blob state is tested for existence from storage
74    pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
75        register_int_counter_vec(
76            "contains_blob_state",
77            "The metric counting how often a blob state is tested for existence from storage",
78            &[],
79        )
80    });
81
82    /// The metric counting how often a certificate is tested for existence from storage.
83    pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
84        register_int_counter_vec(
85            "contains_certificate",
86            "The metric counting how often a certificate is tested for existence from storage",
87            &[],
88        )
89    });
90
91    /// The metric counting how often a hashed certificate value is read from storage.
92    #[doc(hidden)]
93    pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
94        register_int_counter_vec(
95            "read_confirmed_block",
96            "The metric counting how often a hashed confirmed block is read from storage",
97            &[],
98        )
99    });
100
101    /// The metric counting how often confirmed blocks are read from storage.
102    #[doc(hidden)]
103    pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
104        LazyLock::new(|| {
105            register_int_counter_vec(
106                "read_confirmed_blocks",
107                "The metric counting how often confirmed blocks are read from storage",
108                &[],
109            )
110        });
111
112    /// The metric counting how often a blob is read from storage.
113    #[doc(hidden)]
114    pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
115        register_int_counter_vec(
116            "read_blob",
117            "The metric counting how often a blob is read from storage",
118            &[],
119        )
120    });
121
122    /// The metric counting how often a blob state is read from storage.
123    #[doc(hidden)]
124    pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
125        register_int_counter_vec(
126            "read_blob_state",
127            "The metric counting how often a blob state is read from storage",
128            &[],
129        )
130    });
131
132    /// The metric counting how often blob states are read from storage.
133    #[doc(hidden)]
134    pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
135        register_int_counter_vec(
136            "read_blob_states",
137            "The metric counting how often blob states are read from storage",
138            &[],
139        )
140    });
141
142    /// The metric counting how often a blob is written to storage.
143    #[doc(hidden)]
144    pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
145        register_int_counter_vec(
146            "write_blob",
147            "The metric counting how often a blob is written to storage",
148            &[],
149        )
150    });
151
152    /// The metric counting how often a certificate is read from storage.
153    #[doc(hidden)]
154    pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
155        register_int_counter_vec(
156            "read_certificate",
157            "The metric counting how often a certificate is read from storage",
158            &[],
159        )
160    });
161
162    /// The metric counting how often certificates are read from storage.
163    #[doc(hidden)]
164    pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
165        register_int_counter_vec(
166            "read_certificates",
167            "The metric counting how often certificate are read from storage",
168            &[],
169        )
170    });
171
172    /// The metric counting how often a certificate is written to storage.
173    #[doc(hidden)]
174    pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
175        register_int_counter_vec(
176            "write_certificate",
177            "The metric counting how often a certificate is written to storage",
178            &[],
179        )
180    });
181
182    /// The latency to load a chain state.
183    #[doc(hidden)]
184    pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
185        register_histogram_vec(
186            "load_chain_latency",
187            "The latency to load a chain state",
188            &[],
189            exponential_bucket_latencies(1000.0),
190        )
191    });
192
193    /// The metric counting how often an event is read from storage.
194    #[doc(hidden)]
195    pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
196        register_int_counter_vec(
197            "read_event",
198            "The metric counting how often an event is read from storage",
199            &[],
200        )
201    });
202
203    /// The metric counting how often an event is tested for existence from storage
204    pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
205        register_int_counter_vec(
206            "contains_event",
207            "The metric counting how often an event is tested for existence from storage",
208            &[],
209        )
210    });
211
212    /// The metric counting how often an event is written to storage.
213    #[doc(hidden)]
214    pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
215        register_int_counter_vec(
216            "write_event",
217            "The metric counting how often an event is written to storage",
218            &[],
219        )
220    });
221
222    /// The metric counting how often the network description is read from storage.
223    #[doc(hidden)]
224    pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
225        register_int_counter_vec(
226            "network_description",
227            "The metric counting how often the network description is read from storage",
228            &[],
229        )
230    });
231
232    /// The metric counting how often the network description is written to storage.
233    #[doc(hidden)]
234    pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
235        register_int_counter_vec(
236            "write_network_description",
237            "The metric counting how often the network description is written to storage",
238            &[],
239        )
240    });
241}
242
243/// The key used for blobs. The Blob ID itself is contained in the root key.
244const BLOB_KEY: &[u8] = &[0];
245
246/// The key used for blob states. The Blob ID itself is contained in the root key.
247const BLOB_STATE_KEY: &[u8] = &[1];
248
249/// The key used for lite certificates. The cryptohash itself is contained in the root key.
250const LITE_CERTIFICATE_KEY: &[u8] = &[2];
251
252/// The key used for confirmed blocks. The cryptohash itself is contained in the root key.
253const BLOCK_KEY: &[u8] = &[3];
254
255/// The key used for the network description.
256const NETWORK_DESCRIPTION_KEY: &[u8] = &[4];
257
258fn get_block_keys() -> Vec<Vec<u8>> {
259    vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
260}
261
262#[derive(Default)]
263#[allow(clippy::type_complexity)]
264struct MultiPartitionBatch {
265    keys_value_bytes: BTreeMap<Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>>,
266}
267
268impl MultiPartitionBatch {
269    fn new() -> Self {
270        Self::default()
271    }
272
273    fn put_key_values(&mut self, root_key: Vec<u8>, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
274        let entry = self.keys_value_bytes.entry(root_key).or_default();
275        entry.extend(key_values);
276    }
277
278    fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
279        self.put_key_values(root_key, vec![(key, value)]);
280    }
281
282    fn add_blob(&mut self, blob: &Blob) {
283        #[cfg(with_metrics)]
284        metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
285        let root_key = RootKey::BlobId(blob.id()).bytes();
286        let key = BLOB_KEY.to_vec();
287        self.put_key_value(root_key, key, blob.bytes().to_vec());
288    }
289
290    fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
291        let root_key = RootKey::BlobId(blob_id).bytes();
292        let key = BLOB_STATE_KEY.to_vec();
293        let value = bcs::to_bytes(blob_state)?;
294        self.put_key_value(root_key, key, value);
295        Ok(())
296    }
297
298    /// Adds a certificate to the batch.
299    ///
300    /// Writes both the certificate data (indexed by hash) and a height index
301    /// (mapping chain_id + height to hash).
302    ///
303    /// Note: If called multiple times with the same `(chain_id, height)`, the height
304    /// index will be overwritten. The caller is responsible for ensuring that
305    /// certificates at the same height have the same hash.
306    fn add_certificate(
307        &mut self,
308        certificate: &ConfirmedBlockCertificate,
309    ) -> Result<(), ViewError> {
310        #[cfg(with_metrics)]
311        metrics::WRITE_CERTIFICATE_COUNTER
312            .with_label_values(&[])
313            .inc();
314        let hash = certificate.hash();
315
316        // Write certificate data by hash
317        let root_key = RootKey::BlockHash(hash).bytes();
318        let mut key_values = Vec::new();
319        let key = LITE_CERTIFICATE_KEY.to_vec();
320        let value = bcs::to_bytes(&certificate.lite_certificate())?;
321        key_values.push((key, value));
322        let key = BLOCK_KEY.to_vec();
323        let value = bcs::to_bytes(&certificate.value())?;
324        key_values.push((key, value));
325        self.put_key_values(root_key, key_values);
326
327        // Write height index: chain_id -> height -> hash
328        let chain_id = certificate.value().block().header.chain_id;
329        let height = certificate.value().block().header.height;
330        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
331        let height_key = to_height_key(height);
332        let index_value = bcs::to_bytes(&hash)?;
333        self.put_key_value(index_root_key, height_key, index_value);
334
335        Ok(())
336    }
337
338    fn add_event(&mut self, event_id: &EventId, value: Vec<u8>) {
339        #[cfg(with_metrics)]
340        metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
341        let key = to_event_key(event_id);
342        let root_key = RootKey::Event(event_id.chain_id).bytes();
343        self.put_key_value(root_key, key, value);
344    }
345
346    fn add_network_description(
347        &mut self,
348        information: &NetworkDescription,
349    ) -> Result<(), ViewError> {
350        #[cfg(with_metrics)]
351        metrics::WRITE_NETWORK_DESCRIPTION
352            .with_label_values(&[])
353            .inc();
354        let root_key = RootKey::NetworkDescription.bytes();
355        let key = NETWORK_DESCRIPTION_KEY.to_vec();
356        let value = bcs::to_bytes(information)?;
357        self.put_key_value(root_key, key, value);
358        Ok(())
359    }
360}
361
362/// Main implementation of the [`Storage`] trait.
363#[derive(Clone)]
364pub struct DbStorage<Database, Clock = WallClock> {
365    database: Arc<Database>,
366    clock: Clock,
367    thread_pool: Arc<linera_execution::ThreadPool>,
368    wasm_runtime: Option<WasmRuntime>,
369    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
370    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
371    execution_runtime_config: ExecutionRuntimeConfig,
372}
373
374#[derive(Debug, Serialize, Deserialize)]
375enum RootKey {
376    NetworkDescription,
377    BlockExporterState(u32),
378    ChainState(ChainId),
379    BlockHash(CryptoHash),
380    BlobId(BlobId),
381    Event(ChainId),
382    BlockByHeight(ChainId),
383}
384
385const CHAIN_ID_TAG: u8 = 2;
386const BLOB_ID_TAG: u8 = 4;
387const EVENT_ID_TAG: u8 = 5;
388
389impl RootKey {
390    fn bytes(&self) -> Vec<u8> {
391        bcs::to_bytes(self).unwrap()
392    }
393}
394
395#[derive(Debug, Serialize, Deserialize)]
396struct RestrictedEventId {
397    pub stream_id: StreamId,
398    pub index: u32,
399}
400
401fn to_event_key(event_id: &EventId) -> Vec<u8> {
402    let restricted_event_id = RestrictedEventId {
403        stream_id: event_id.stream_id.clone(),
404        index: event_id.index,
405    };
406    bcs::to_bytes(&restricted_event_id).unwrap()
407}
408
409pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
410    bcs::to_bytes(&height).unwrap()
411}
412
413fn is_chain_state(root_key: &[u8]) -> bool {
414    if root_key.is_empty() {
415        return false;
416    }
417    root_key[0] == CHAIN_ID_TAG
418}
419
420/// An implementation of [`DualStoreRootKeyAssignment`] that stores the
421/// chain states into the first store.
422#[derive(Clone, Copy)]
423pub struct ChainStatesFirstAssignment;
424
425impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
426    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
427        if root_key.is_empty() {
428            return Ok(StoreInUse::Second);
429        }
430        let store = match is_chain_state(root_key) {
431            true => StoreInUse::First,
432            false => StoreInUse::Second,
433        };
434        Ok(store)
435    }
436}
437
438/// A `Clock` implementation using the system clock.
439#[derive(Clone)]
440pub struct WallClock;
441
442#[cfg_attr(not(web), async_trait)]
443#[cfg_attr(web, async_trait(?Send))]
444impl Clock for WallClock {
445    fn current_time(&self) -> Timestamp {
446        Timestamp::now()
447    }
448
449    async fn sleep(&self, delta: TimeDelta) {
450        linera_base::time::timer::sleep(delta.as_duration()).await
451    }
452
453    async fn sleep_until(&self, timestamp: Timestamp) {
454        let delta = timestamp.delta_since(Timestamp::now());
455        if delta > TimeDelta::ZERO {
456            self.sleep(delta).await
457        }
458    }
459}
460
461#[cfg(with_testing)]
462#[derive(Default)]
463struct TestClockInner {
464    time: Timestamp,
465    sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
466    /// Optional callback that decides whether to auto-advance for a given target timestamp.
467    /// Returns `true` if the clock should auto-advance to that time.
468    sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
469}
470
471#[cfg(with_testing)]
472impl TestClockInner {
473    fn set(&mut self, time: Timestamp) {
474        self.time = time;
475        let senders = self.sleeps.split_off(&Reverse(time));
476        for sender in senders.into_values().flatten() {
477            // Receiver may have been dropped if the sleep was cancelled.
478            sender.send(()).ok();
479        }
480    }
481
482    fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
483        let target_time = self.time.saturating_add(delta);
484        self.add_sleep_until(target_time)
485    }
486
487    fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
488        let (sender, receiver) = oneshot::channel();
489        let should_auto_advance = self
490            .sleep_callback
491            .as_ref()
492            .is_some_and(|callback| callback(time));
493        if should_auto_advance && time > self.time {
494            // Auto-advance mode: immediately advance the clock and complete the sleep.
495            self.set(time);
496            // Receiver may have been dropped if the sleep was cancelled.
497            sender.send(()).ok();
498        } else if self.time >= time {
499            // Receiver may have been dropped if the sleep was cancelled.
500            sender.send(()).ok();
501        } else {
502            self.sleeps.entry(Reverse(time)).or_default().push(sender);
503        }
504        receiver
505    }
506}
507
508/// A clock implementation that uses a stored number of microseconds and that can be updated
509/// explicitly. All clones share the same time, and setting it in one clone updates all the others.
510#[cfg(with_testing)]
511#[derive(Clone, Default)]
512pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
513
514#[cfg(with_testing)]
515#[cfg_attr(not(web), async_trait)]
516#[cfg_attr(web, async_trait(?Send))]
517impl Clock for TestClock {
518    fn current_time(&self) -> Timestamp {
519        self.lock().time
520    }
521
522    async fn sleep(&self, delta: TimeDelta) {
523        if delta == TimeDelta::ZERO {
524            return;
525        }
526        let receiver = self.lock().add_sleep(delta);
527        // Sender may have been dropped if the clock was dropped; just stop waiting.
528        receiver.await.ok();
529    }
530
531    async fn sleep_until(&self, timestamp: Timestamp) {
532        let receiver = self.lock().add_sleep_until(timestamp);
533        // Sender may have been dropped if the clock was dropped; just stop waiting.
534        receiver.await.ok();
535    }
536}
537
538#[cfg(with_testing)]
539impl TestClock {
540    /// Creates a new clock with its time set to 0, i.e. the Unix epoch.
541    pub fn new() -> Self {
542        TestClock(Arc::default())
543    }
544
545    /// Sets the current time.
546    pub fn set(&self, time: Timestamp) {
547        self.lock().set(time);
548    }
549
550    /// Advances the current time by the specified delta.
551    pub fn add(&self, delta: TimeDelta) {
552        let mut guard = self.lock();
553        let time = guard.time.saturating_add(delta);
554        guard.set(time);
555    }
556
557    /// Returns the current time according to the test clock.
558    pub fn current_time(&self) -> Timestamp {
559        self.lock().time
560    }
561
562    /// Sets a callback that decides whether to auto-advance for each sleep call.
563    ///
564    /// The callback receives the target timestamp and should return `true` if the clock
565    /// should auto-advance to that time, or `false` if the sleep should block normally.
566    pub fn set_sleep_callback<F>(&self, callback: F)
567    where
568        F: Fn(Timestamp) -> bool + Send + Sync + 'static,
569    {
570        self.lock().sleep_callback = Some(Box::new(callback));
571    }
572
573    /// Clears the sleep callback.
574    pub fn clear_sleep_callback(&self) {
575        self.lock().sleep_callback = None;
576    }
577
578    fn lock(&self) -> std::sync::MutexGuard<'_, TestClockInner> {
579        self.0.lock().expect("poisoned TestClock mutex")
580    }
581}
582
583#[cfg_attr(not(web), async_trait)]
584#[cfg_attr(web, async_trait(?Send))]
585impl<Database, C> Storage for DbStorage<Database, C>
586where
587    Database: KeyValueDatabase<
588            Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
589            Error: Send + Sync,
590        > + Clone
591        + linera_base::util::traits::AutoTraits
592        + 'static,
593    C: Clock + Clone + Send + Sync + 'static,
594{
595    type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
596    type Clock = C;
597    type BlockExporterContext = ViewContext<u32, Database::Store>;
598
599    fn clock(&self) -> &C {
600        &self.clock
601    }
602
603    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
604        &self.thread_pool
605    }
606
607    #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
608    async fn load_chain(
609        &self,
610        chain_id: ChainId,
611    ) -> Result<ChainStateView<Self::Context>, ViewError> {
612        #[cfg(with_metrics)]
613        let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
614        let runtime_context = ChainRuntimeContext {
615            storage: self.clone(),
616            thread_pool: self.thread_pool.clone(),
617            chain_id,
618            execution_runtime_config: self.execution_runtime_config,
619            user_contracts: self.user_contracts.clone(),
620            user_services: self.user_services.clone(),
621        };
622        let root_key = RootKey::ChainState(chain_id).bytes();
623        let store = self.database.open_exclusive(&root_key)?;
624        let context = ViewContext::create_root_context(store, runtime_context).await?;
625        ChainStateView::load(context).await
626    }
627
628    #[instrument(level = "trace", skip_all, fields(%blob_id))]
629    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
630        let root_key = RootKey::BlobId(blob_id).bytes();
631        let store = self.database.open_shared(&root_key)?;
632        let test = store.contains_key(BLOB_KEY).await?;
633        #[cfg(with_metrics)]
634        metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
635        Ok(test)
636    }
637
638    #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
639    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
640        let mut missing_blobs = Vec::new();
641        for blob_id in blob_ids {
642            let root_key = RootKey::BlobId(*blob_id).bytes();
643            let store = self.database.open_shared(&root_key)?;
644            if !store.contains_key(BLOB_KEY).await? {
645                missing_blobs.push(*blob_id);
646            }
647        }
648        #[cfg(with_metrics)]
649        metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
650        Ok(missing_blobs)
651    }
652
653    #[instrument(skip_all, fields(%blob_id))]
654    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
655        let root_key = RootKey::BlobId(blob_id).bytes();
656        let store = self.database.open_shared(&root_key)?;
657        let test = store.contains_key(BLOB_STATE_KEY).await?;
658        #[cfg(with_metrics)]
659        metrics::CONTAINS_BLOB_STATE_COUNTER
660            .with_label_values(&[])
661            .inc();
662        Ok(test)
663    }
664
665    #[instrument(skip_all, fields(%hash))]
666    async fn read_confirmed_block(
667        &self,
668        hash: CryptoHash,
669    ) -> Result<Option<ConfirmedBlock>, ViewError> {
670        let root_key = RootKey::BlockHash(hash).bytes();
671        let store = self.database.open_shared(&root_key)?;
672        let value = store.read_value(BLOCK_KEY).await?;
673        #[cfg(with_metrics)]
674        metrics::READ_CONFIRMED_BLOCK_COUNTER
675            .with_label_values(&[])
676            .inc();
677        Ok(value)
678    }
679
680    #[instrument(skip_all)]
681    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
682        &self,
683        hashes: I,
684    ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError> {
685        let hashes = hashes.into_iter().collect::<Vec<_>>();
686        if hashes.is_empty() {
687            return Ok(Vec::new());
688        }
689        let root_keys = Self::get_root_keys_for_certificates(&hashes);
690        let mut blocks = Vec::new();
691        for root_key in root_keys {
692            let store = self.database.open_shared(&root_key)?;
693            blocks.push(store.read_value(BLOCK_KEY).await?);
694        }
695        #[cfg(with_metrics)]
696        metrics::READ_CONFIRMED_BLOCKS_COUNTER
697            .with_label_values(&[])
698            .inc_by(hashes.len() as u64);
699        Ok(blocks)
700    }
701
702    #[instrument(skip_all, fields(%blob_id))]
703    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
704        let root_key = RootKey::BlobId(blob_id).bytes();
705        let store = self.database.open_shared(&root_key)?;
706        let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
707        #[cfg(with_metrics)]
708        metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
709        Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
710    }
711
712    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
713    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
714        if blob_ids.is_empty() {
715            return Ok(Vec::new());
716        }
717        let mut blobs = Vec::new();
718        for blob_id in blob_ids {
719            blobs.push(self.read_blob(*blob_id).await?);
720        }
721        #[cfg(with_metrics)]
722        metrics::READ_BLOB_COUNTER
723            .with_label_values(&[])
724            .inc_by(blob_ids.len() as u64);
725        Ok(blobs)
726    }
727
728    #[instrument(skip_all, fields(%blob_id))]
729    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
730        let root_key = RootKey::BlobId(blob_id).bytes();
731        let store = self.database.open_shared(&root_key)?;
732        let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
733        #[cfg(with_metrics)]
734        metrics::READ_BLOB_STATE_COUNTER
735            .with_label_values(&[])
736            .inc();
737        Ok(blob_state)
738    }
739
740    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
741    async fn read_blob_states(
742        &self,
743        blob_ids: &[BlobId],
744    ) -> Result<Vec<Option<BlobState>>, ViewError> {
745        if blob_ids.is_empty() {
746            return Ok(Vec::new());
747        }
748        let mut blob_states = Vec::new();
749        for blob_id in blob_ids {
750            blob_states.push(self.read_blob_state(*blob_id).await?);
751        }
752        #[cfg(with_metrics)]
753        metrics::READ_BLOB_STATES_COUNTER
754            .with_label_values(&[])
755            .inc_by(blob_ids.len() as u64);
756        Ok(blob_states)
757    }
758
759    #[instrument(skip_all, fields(blob_id = %blob.id()))]
760    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
761        let mut batch = MultiPartitionBatch::new();
762        batch.add_blob(blob);
763        self.write_batch(batch).await?;
764        Ok(())
765    }
766
767    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
768    async fn maybe_write_blob_states(
769        &self,
770        blob_ids: &[BlobId],
771        blob_state: BlobState,
772    ) -> Result<(), ViewError> {
773        if blob_ids.is_empty() {
774            return Ok(());
775        }
776        let mut maybe_blob_states = Vec::new();
777        for blob_id in blob_ids {
778            let root_key = RootKey::BlobId(*blob_id).bytes();
779            let store = self.database.open_shared(&root_key)?;
780            let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
781            maybe_blob_states.push(maybe_blob_state);
782        }
783        let mut batch = MultiPartitionBatch::new();
784        for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
785            match maybe_blob_state {
786                None => {
787                    batch.add_blob_state(*blob_id, &blob_state)?;
788                }
789                Some(state) => {
790                    if state.epoch < blob_state.epoch {
791                        batch.add_blob_state(*blob_id, &blob_state)?;
792                    }
793                }
794            }
795        }
796        // We tolerate race conditions because two active chains are likely to
797        // be both from the latest epoch, and otherwise failing to pick the
798        // more recent blob state has limited impact.
799        self.write_batch(batch).await?;
800        Ok(())
801    }
802
803    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
804    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
805        if blobs.is_empty() {
806            return Ok(Vec::new());
807        }
808        let mut batch = MultiPartitionBatch::new();
809        let mut blob_states = Vec::new();
810        for blob in blobs {
811            let root_key = RootKey::BlobId(blob.id()).bytes();
812            let store = self.database.open_shared(&root_key)?;
813            let has_state = store.contains_key(BLOB_STATE_KEY).await?;
814            blob_states.push(has_state);
815            if has_state {
816                batch.add_blob(blob);
817            }
818        }
819        self.write_batch(batch).await?;
820        Ok(blob_states)
821    }
822
823    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
824    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
825        if blobs.is_empty() {
826            return Ok(());
827        }
828        let mut batch = MultiPartitionBatch::new();
829        for blob in blobs {
830            batch.add_blob(blob);
831        }
832        self.write_batch(batch).await
833    }
834
835    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
836    async fn write_blobs_and_certificate(
837        &self,
838        blobs: &[Blob],
839        certificate: &ConfirmedBlockCertificate,
840    ) -> Result<(), ViewError> {
841        let mut batch = MultiPartitionBatch::new();
842        for blob in blobs {
843            batch.add_blob(blob);
844        }
845        batch.add_certificate(certificate)?;
846        self.write_batch(batch).await
847    }
848
849    #[instrument(skip_all, fields(%hash))]
850    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
851        let root_key = RootKey::BlockHash(hash).bytes();
852        let store = self.database.open_shared(&root_key)?;
853        let results = store.contains_keys(&get_block_keys()).await?;
854        #[cfg(with_metrics)]
855        metrics::CONTAINS_CERTIFICATE_COUNTER
856            .with_label_values(&[])
857            .inc();
858        Ok(results[0] && results[1])
859    }
860
861    #[instrument(skip_all, fields(%hash))]
862    async fn read_certificate(
863        &self,
864        hash: CryptoHash,
865    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
866        let root_key = RootKey::BlockHash(hash).bytes();
867        let store = self.database.open_shared(&root_key)?;
868        let values = store.read_multi_values_bytes(&get_block_keys()).await?;
869        #[cfg(with_metrics)]
870        metrics::READ_CERTIFICATE_COUNTER
871            .with_label_values(&[])
872            .inc();
873        Self::deserialize_certificate(&values, hash)
874    }
875
876    #[instrument(skip_all)]
877    async fn read_certificates(
878        &self,
879        hashes: &[CryptoHash],
880    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
881        let raw_certs = self.read_certificates_raw(hashes).await?;
882
883        raw_certs
884            .into_iter()
885            .zip(hashes)
886            .map(|(maybe_raw, hash)| {
887                let Some((lite_cert_bytes, confirmed_block_bytes)) = maybe_raw else {
888                    return Ok(None);
889                };
890                let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
891                let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
892                assert_eq!(&value.hash(), hash);
893                let certificate = cert
894                    .with_value(value)
895                    .ok_or(ViewError::InconsistentEntries)?;
896                Ok(Some(certificate))
897            })
898            .collect()
899    }
900
901    #[instrument(skip_all)]
902    async fn read_certificates_raw(
903        &self,
904        hashes: &[CryptoHash],
905    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
906        if hashes.is_empty() {
907            return Ok(Vec::new());
908        }
909        let root_keys = Self::get_root_keys_for_certificates(hashes);
910        let mut values = Vec::new();
911        for root_key in root_keys {
912            let store = self.database.open_shared(&root_key)?;
913            values.extend(store.read_multi_values_bytes(&get_block_keys()).await?);
914        }
915        #[cfg(with_metrics)]
916        metrics::READ_CERTIFICATES_COUNTER
917            .with_label_values(&[])
918            .inc_by(hashes.len() as u64);
919        Ok(values
920            .chunks_exact(2)
921            .map(|chunk| {
922                let lite_cert_bytes = chunk[0].as_ref()?;
923                let confirmed_block_bytes = chunk[1].as_ref()?;
924                Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
925            })
926            .collect())
927    }
928
929    async fn read_certificate_hashes_by_heights(
930        &self,
931        chain_id: ChainId,
932        heights: &[BlockHeight],
933    ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
934        if heights.is_empty() {
935            return Ok(Vec::new());
936        }
937
938        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
939        let store = self.database.open_shared(&index_root_key)?;
940        let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
941        let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
942        let hash_options: Vec<Option<CryptoHash>> = hash_bytes
943            .into_iter()
944            .map(|opt| {
945                opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
946                    .transpose()
947            })
948            .collect::<Result<_, _>>()?;
949
950        Ok(hash_options)
951    }
952
953    #[instrument(skip_all)]
954    async fn read_certificates_by_heights_raw(
955        &self,
956        chain_id: ChainId,
957        heights: &[BlockHeight],
958    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
959        let hashes: Vec<Option<CryptoHash>> = self
960            .read_certificate_hashes_by_heights(chain_id, heights)
961            .await?;
962
963        // Map from hash to all indices in the heights array (handles duplicates)
964        let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
965        for (index, maybe_hash) in hashes.iter().enumerate() {
966            if let Some(hash) = maybe_hash {
967                indices.entry(*hash).or_default().push(index);
968            }
969        }
970
971        // Deduplicate hashes for the storage query
972        let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
973
974        let mut result = vec![None; heights.len()];
975
976        for (raw_cert, hash) in self
977            .read_certificates_raw(&unique_hashes)
978            .await?
979            .into_iter()
980            .zip(unique_hashes)
981        {
982            if let Some(idx_list) = indices.get(&hash) {
983                for &index in idx_list {
984                    result[index] = raw_cert.clone();
985                }
986            } else {
987                // This should not happen, but log a warning if it does.
988                tracing::error!(?hash, "certificate hash not found in indices map",);
989            }
990        }
991
992        Ok(result)
993    }
994
995    #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
996    async fn read_certificates_by_heights(
997        &self,
998        chain_id: ChainId,
999        heights: &[BlockHeight],
1000    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
1001        self.read_certificates_by_heights_raw(chain_id, heights)
1002            .await?
1003            .into_iter()
1004            .map(|maybe_raw| match maybe_raw {
1005                None => Ok(None),
1006                Some((lite_cert_bytes, confirmed_block_bytes)) => {
1007                    let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
1008                    let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
1009                    let certificate = cert
1010                        .with_value(value)
1011                        .ok_or(ViewError::InconsistentEntries)?;
1012                    Ok(Some(certificate))
1013                }
1014            })
1015            .collect()
1016    }
1017
1018    #[instrument(skip_all, fields(event_id = ?event_id))]
1019    async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
1020        let event_key = to_event_key(&event_id);
1021        let root_key = RootKey::Event(event_id.chain_id).bytes();
1022        let store = self.database.open_shared(&root_key)?;
1023        let event = store.read_value_bytes(&event_key).await?;
1024        #[cfg(with_metrics)]
1025        metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
1026        Ok(event)
1027    }
1028
1029    #[instrument(skip_all, fields(event_id = ?event_id))]
1030    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1031        let event_key = to_event_key(&event_id);
1032        let root_key = RootKey::Event(event_id.chain_id).bytes();
1033        let store = self.database.open_shared(&root_key)?;
1034        let exists = store.contains_key(&event_key).await?;
1035        #[cfg(with_metrics)]
1036        metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
1037        Ok(exists)
1038    }
1039
1040    #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
1041    async fn read_events_from_index(
1042        &self,
1043        chain_id: &ChainId,
1044        stream_id: &StreamId,
1045        start_index: u32,
1046    ) -> Result<Vec<IndexAndEvent>, ViewError> {
1047        let root_key = RootKey::Event(*chain_id).bytes();
1048        let store = self.database.open_shared(&root_key)?;
1049        let mut keys = Vec::new();
1050        let mut indices = Vec::new();
1051        let prefix = bcs::to_bytes(stream_id).unwrap();
1052        for short_key in store.find_keys_by_prefix(&prefix).await? {
1053            let index = bcs::from_bytes::<u32>(&short_key)?;
1054            if index >= start_index {
1055                let mut key = prefix.clone();
1056                key.extend(short_key);
1057                keys.push(key);
1058                indices.push(index);
1059            }
1060        }
1061        let values = store.read_multi_values_bytes(&keys).await?;
1062        let mut returned_values = Vec::new();
1063        for (index, value) in indices.into_iter().zip(values) {
1064            let event = value.unwrap();
1065            returned_values.push(IndexAndEvent { index, event });
1066        }
1067        Ok(returned_values)
1068    }
1069
1070    #[instrument(skip_all)]
1071    async fn write_events(
1072        &self,
1073        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1074    ) -> Result<(), ViewError> {
1075        let mut batch = MultiPartitionBatch::new();
1076        for (event_id, value) in events {
1077            batch.add_event(&event_id, value);
1078        }
1079        self.write_batch(batch).await
1080    }
1081
1082    #[instrument(skip_all)]
1083    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1084        let root_key = RootKey::NetworkDescription.bytes();
1085        let store = self.database.open_shared(&root_key)?;
1086        let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1087        #[cfg(with_metrics)]
1088        metrics::READ_NETWORK_DESCRIPTION
1089            .with_label_values(&[])
1090            .inc();
1091        Ok(maybe_value)
1092    }
1093
1094    #[instrument(skip_all)]
1095    async fn write_network_description(
1096        &self,
1097        information: &NetworkDescription,
1098    ) -> Result<(), ViewError> {
1099        let mut batch = MultiPartitionBatch::new();
1100        batch.add_network_description(information)?;
1101        self.write_batch(batch).await?;
1102        Ok(())
1103    }
1104
1105    fn wasm_runtime(&self) -> Option<WasmRuntime> {
1106        self.wasm_runtime
1107    }
1108
1109    #[instrument(skip_all)]
1110    async fn block_exporter_context(
1111        &self,
1112        block_exporter_id: u32,
1113    ) -> Result<Self::BlockExporterContext, ViewError> {
1114        let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1115        let store = self.database.open_exclusive(&root_key)?;
1116        Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1117    }
1118
1119    async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError> {
1120        let root_keys = self.database.list_root_keys().await?;
1121        let mut blob_ids = Vec::new();
1122        for root_key in root_keys {
1123            if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1124                let root_key_red = &root_key[1..];
1125                let blob_id = bcs::from_bytes(root_key_red)?;
1126                blob_ids.push(blob_id);
1127            }
1128        }
1129        Ok(blob_ids)
1130    }
1131
1132    async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError> {
1133        let root_keys = self.database.list_root_keys().await?;
1134        let mut chain_ids = Vec::new();
1135        for root_key in root_keys {
1136            if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1137                let root_key_red = &root_key[1..];
1138                let chain_id = bcs::from_bytes(root_key_red)?;
1139                chain_ids.push(chain_id);
1140            }
1141        }
1142        Ok(chain_ids)
1143    }
1144
1145    async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError> {
1146        let root_keys = self.database.list_root_keys().await?;
1147        let mut event_ids = Vec::new();
1148        for root_key in root_keys {
1149            if !root_key.is_empty() && root_key[0] == EVENT_ID_TAG {
1150                let root_key_red = &root_key[1..];
1151                let chain_id = bcs::from_bytes(root_key_red)?;
1152                let store = self.database.open_shared(&root_key)?;
1153                let keys = store.find_keys_by_prefix(&[]).await?;
1154                for key in keys {
1155                    let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
1156                    let event_id = EventId {
1157                        chain_id,
1158                        stream_id: restricted_event_id.stream_id,
1159                        index: restricted_event_id.index,
1160                    };
1161                    event_ids.push(event_id);
1162                }
1163            }
1164        }
1165        Ok(event_ids)
1166    }
1167}
1168
1169impl<Database, C> DbStorage<Database, C>
1170where
1171    Database: KeyValueDatabase + Clone,
1172    Database::Store: KeyValueStore + Clone,
1173    C: Clock,
1174    Database::Error: Send + Sync,
1175{
1176    #[instrument(skip_all)]
1177    fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1178        hashes
1179            .iter()
1180            .map(|hash| RootKey::BlockHash(*hash).bytes())
1181            .collect()
1182    }
1183
1184    #[instrument(skip_all)]
1185    fn deserialize_certificate(
1186        pair: &[Option<Vec<u8>>],
1187        hash: CryptoHash,
1188    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1189        let Some(cert_bytes) = pair[0].as_ref() else {
1190            return Ok(None);
1191        };
1192        let Some(value_bytes) = pair[1].as_ref() else {
1193            return Ok(None);
1194        };
1195        let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
1196        let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
1197        assert_eq!(value.hash(), hash);
1198        let certificate = cert
1199            .with_value(value)
1200            .ok_or(ViewError::InconsistentEntries)?;
1201        Ok(Some(certificate))
1202    }
1203
1204    #[instrument(skip_all)]
1205    async fn write_entry(
1206        store: &Database::Store,
1207        key_values: Vec<(Vec<u8>, Vec<u8>)>,
1208    ) -> Result<(), ViewError> {
1209        let mut batch = Batch::new();
1210        for (key, value) in key_values {
1211            batch.put_key_value_bytes(key, value);
1212        }
1213        store.write_batch(batch).await?;
1214        Ok(())
1215    }
1216
1217    #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1218    async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1219        if batch.keys_value_bytes.is_empty() {
1220            return Ok(());
1221        }
1222        let mut futures = Vec::new();
1223        for (root_key, key_values) in batch.keys_value_bytes {
1224            let store = self.database.open_shared(&root_key)?;
1225            futures.push(async move { Self::write_entry(&store, key_values).await });
1226        }
1227        futures::future::try_join_all(futures).await?;
1228        Ok(())
1229    }
1230}
1231
1232impl<Database, C> DbStorage<Database, C> {
1233    fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1234        Self {
1235            database: Arc::new(database),
1236            clock,
1237            // The `Arc` here is required on native but useless on the Web.
1238            #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1239            thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1240            wasm_runtime,
1241            user_contracts: Arc::new(papaya::HashMap::new()),
1242            user_services: Arc::new(papaya::HashMap::new()),
1243            execution_runtime_config: ExecutionRuntimeConfig::default(),
1244        }
1245    }
1246
1247    /// Sets whether contract log messages should be output.
1248    pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1249        self.execution_runtime_config.allow_application_logs = allow;
1250        self
1251    }
1252}
1253
1254impl<Database> DbStorage<Database, WallClock>
1255where
1256    Database: KeyValueDatabase + Clone + 'static,
1257    Database::Error: Send + Sync,
1258    Database::Store: KeyValueStore + Clone + 'static,
1259{
1260    pub async fn maybe_create_and_connect(
1261        config: &Database::Config,
1262        namespace: &str,
1263        wasm_runtime: Option<WasmRuntime>,
1264    ) -> Result<Self, Database::Error> {
1265        let database = Database::maybe_create_and_connect(config, namespace).await?;
1266        Ok(Self::new(database, wasm_runtime, WallClock))
1267    }
1268
1269    pub async fn connect(
1270        config: &Database::Config,
1271        namespace: &str,
1272        wasm_runtime: Option<WasmRuntime>,
1273    ) -> Result<Self, Database::Error> {
1274        let database = Database::connect(config, namespace).await?;
1275        Ok(Self::new(database, wasm_runtime, WallClock))
1276    }
1277}
1278
1279#[cfg(with_testing)]
1280impl<Database> DbStorage<Database, TestClock>
1281where
1282    Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1283    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1284    Database::Error: Send + Sync,
1285{
1286    pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1287        let config = Database::new_test_config().await.unwrap();
1288        let namespace = generate_test_namespace();
1289        DbStorage::<Database, TestClock>::new_for_testing(
1290            config,
1291            &namespace,
1292            wasm_runtime,
1293            TestClock::new(),
1294        )
1295        .await
1296        .unwrap()
1297    }
1298
1299    pub async fn new_for_testing(
1300        config: Database::Config,
1301        namespace: &str,
1302        wasm_runtime: Option<WasmRuntime>,
1303        clock: TestClock,
1304    ) -> Result<Self, Database::Error> {
1305        let database = Database::recreate_and_connect(&config, namespace).await?;
1306        Ok(Self::new(database, wasm_runtime, clock))
1307    }
1308}
1309
1310#[cfg(test)]
1311mod tests {
1312    use linera_base::{
1313        crypto::{CryptoHash, TestString},
1314        data_types::{BlockHeight, Epoch, Round, Timestamp},
1315        identifiers::{
1316            ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
1317            StreamName,
1318        },
1319    };
1320    use linera_chain::{
1321        block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
1322        types::ConfirmedBlockCertificate,
1323    };
1324    use linera_views::{
1325        memory::MemoryDatabase,
1326        store::{KeyValueDatabase, ReadableKeyValueStore as _},
1327    };
1328
1329    use crate::{
1330        db_storage::{
1331            to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
1332            EVENT_ID_TAG,
1333        },
1334        DbStorage, Storage, TestClock,
1335    };
1336
1337    // Several functionalities of the storage rely on the way that the serialization
1338    // is done. Thus we need to check that the serialization works in the way that
1339    // we expect.
1340
1341    // The listing of the blobs in `list_blob_ids` depends on the serialization
1342    // of `RootKey::Blob`.
1343    #[test]
1344    fn test_root_key_blob_serialization() {
1345        let hash = CryptoHash::default();
1346        let blob_type = BlobType::default();
1347        let blob_id = BlobId::new(hash, blob_type);
1348        let root_key = RootKey::BlobId(blob_id).bytes();
1349        assert_eq!(root_key[0], BLOB_ID_TAG);
1350        assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
1351    }
1352
1353    // The listing of the chains in `list_chain_ids` depends on the serialization
1354    // of `RootKey::ChainState`.
1355    #[test]
1356    fn test_root_key_chainstate_serialization() {
1357        let hash = CryptoHash::default();
1358        let chain_id = ChainId(hash);
1359        let root_key = RootKey::ChainState(chain_id).bytes();
1360        assert_eq!(root_key[0], CHAIN_ID_TAG);
1361        assert_eq!(
1362            bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
1363            chain_id
1364        );
1365    }
1366
1367    // The listing of the events in `read_events_from_index` depends on the
1368    // serialization of `RootKey::Event`.
1369    #[test]
1370    fn test_root_key_event_serialization() {
1371        let hash = CryptoHash::test_hash("49");
1372        let chain_id = ChainId(hash);
1373        let application_description_hash = CryptoHash::test_hash("42");
1374        let application_id = ApplicationId::new(application_description_hash);
1375        let application_id = GenericApplicationId::User(application_id);
1376        let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
1377        let stream_id = StreamId {
1378            application_id,
1379            stream_name,
1380        };
1381        let prefix = bcs::to_bytes(&stream_id).unwrap();
1382
1383        let index = 1567;
1384        let event_id = EventId {
1385            chain_id,
1386            stream_id,
1387            index,
1388        };
1389        let root_key = RootKey::Event(chain_id).bytes();
1390        assert_eq!(root_key[0], EVENT_ID_TAG);
1391        let key = to_event_key(&event_id);
1392        assert!(key.starts_with(&prefix));
1393    }
1394
1395    // The height index lookup depends on the serialization of RootKey::BlockByHeight
1396    // and to_height_key, following the same pattern as Event.
1397    #[test]
1398    fn test_root_key_block_by_height_serialization() {
1399        use linera_base::data_types::BlockHeight;
1400
1401        let hash = CryptoHash::default();
1402        let chain_id = ChainId(hash);
1403        let height = BlockHeight(42);
1404
1405        // RootKey::BlockByHeight uses only ChainId for partitioning (like Event)
1406        let root_key = RootKey::BlockByHeight(chain_id).bytes();
1407        let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
1408        assert_eq!(deserialized_chain_id, chain_id);
1409
1410        // Height is encoded as a key (like index in Event)
1411        let height_key = to_height_key(height);
1412        let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
1413        assert_eq!(deserialized_height, height);
1414    }
1415
1416    #[cfg(with_testing)]
1417    #[tokio::test]
1418    async fn test_add_certificate_creates_height_index() {
1419        // Create test storage
1420        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1421
1422        // Create a test certificate at a specific height
1423        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1424        let height = BlockHeight(5);
1425        let block = Block {
1426            header: BlockHeader {
1427                chain_id,
1428                epoch: Epoch::ZERO,
1429                height,
1430                timestamp: Timestamp::from(0),
1431                state_hash: CryptoHash::new(&TestString::new("state_hash")),
1432                previous_block_hash: None,
1433                authenticated_owner: None,
1434                transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
1435                messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
1436                previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1437                    "prev_msg_blocks_hash",
1438                )),
1439                previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1440                    "prev_event_blocks_hash",
1441                )),
1442                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
1443                events_hash: CryptoHash::new(&TestString::new("events_hash")),
1444                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1445                operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
1446            },
1447            body: BlockBody {
1448                transactions: vec![],
1449                messages: vec![],
1450                previous_message_blocks: Default::default(),
1451                previous_event_blocks: Default::default(),
1452                oracle_responses: vec![],
1453                events: vec![],
1454                blobs: vec![],
1455                operation_results: vec![],
1456            },
1457        };
1458        let confirmed_block = ConfirmedBlock::new(block);
1459        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1460
1461        // Write certificate
1462        let mut batch = MultiPartitionBatch::new();
1463        batch.add_certificate(&certificate).unwrap();
1464        storage.write_batch(batch).await.unwrap();
1465
1466        // Verify height index was created (following Event pattern)
1467        let hash = certificate.hash();
1468        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1469        let store = storage.database.open_shared(&index_root_key).unwrap();
1470        let height_key = to_height_key(height);
1471        let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
1472
1473        assert!(value_bytes.is_some(), "Height index was not created");
1474        let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
1475        assert_eq!(stored_hash, hash, "Height index contains wrong hash");
1476    }
1477
1478    #[cfg(with_testing)]
1479    #[tokio::test]
1480    async fn test_read_certificates_by_heights() {
1481        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1482        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1483
1484        // Write certificates at heights 1, 3, 5
1485        let mut batch = MultiPartitionBatch::new();
1486        let mut expected_certs = vec![];
1487
1488        for height in [1, 3, 5] {
1489            let block = Block {
1490                header: BlockHeader {
1491                    chain_id,
1492                    epoch: Epoch::ZERO,
1493                    height: BlockHeight(height),
1494                    timestamp: Timestamp::from(0),
1495                    state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
1496                    previous_block_hash: None,
1497                    authenticated_owner: None,
1498                    transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
1499                    messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
1500                    previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1501                        "pmb_hash_{height}",
1502                    )),
1503                    previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1504                        "peb_hash_{height}",
1505                    )),
1506                    oracle_responses_hash: CryptoHash::new(&TestString::new(
1507                        "oracle_hash_{height}",
1508                    )),
1509                    events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
1510                    blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
1511                    operation_results_hash: CryptoHash::new(&TestString::new(
1512                        "op_results_hash_{height}",
1513                    )),
1514                },
1515                body: BlockBody {
1516                    transactions: vec![],
1517                    messages: vec![],
1518                    previous_message_blocks: Default::default(),
1519                    previous_event_blocks: Default::default(),
1520                    oracle_responses: vec![],
1521                    events: vec![],
1522                    blobs: vec![],
1523                    operation_results: vec![],
1524                },
1525            };
1526            let confirmed_block = ConfirmedBlock::new(block);
1527            let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1528            expected_certs.push((height, cert.clone()));
1529            batch.add_certificate(&cert).unwrap();
1530        }
1531        storage.write_batch(batch).await.unwrap();
1532
1533        // Test: Read in order [1, 3, 5]
1534        let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
1535        let result = storage
1536            .read_certificates_by_heights(chain_id, &heights)
1537            .await
1538            .unwrap();
1539        assert_eq!(result.len(), 3);
1540        assert_eq!(
1541            result[0].as_ref().unwrap().hash(),
1542            expected_certs[0].1.hash()
1543        );
1544        assert_eq!(
1545            result[1].as_ref().unwrap().hash(),
1546            expected_certs[1].1.hash()
1547        );
1548        assert_eq!(
1549            result[2].as_ref().unwrap().hash(),
1550            expected_certs[2].1.hash()
1551        );
1552
1553        // Test: Read out of order [5, 1, 3]
1554        let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
1555        let result = storage
1556            .read_certificates_by_heights(chain_id, &heights)
1557            .await
1558            .unwrap();
1559        assert_eq!(result.len(), 3);
1560        assert_eq!(
1561            result[0].as_ref().unwrap().hash(),
1562            expected_certs[2].1.hash()
1563        );
1564        assert_eq!(
1565            result[1].as_ref().unwrap().hash(),
1566            expected_certs[0].1.hash()
1567        );
1568        assert_eq!(
1569            result[2].as_ref().unwrap().hash(),
1570            expected_certs[1].1.hash()
1571        );
1572
1573        // Test: Read with missing heights [1, 2, 3]
1574        let heights = vec![
1575            BlockHeight(1),
1576            BlockHeight(2),
1577            BlockHeight(3),
1578            BlockHeight(3),
1579        ];
1580        let result = storage
1581            .read_certificates_by_heights(chain_id, &heights)
1582            .await
1583            .unwrap();
1584        assert_eq!(result.len(), 4); // BlockHeight(3) was duplicated.
1585        assert!(result[0].is_some());
1586        assert!(result[1].is_none()); // Height 2 doesn't exist
1587        assert!(result[2].is_some());
1588        assert!(result[3].is_some());
1589        assert_eq!(
1590            result[2].as_ref().unwrap().hash(),
1591            result[3].as_ref().unwrap().hash()
1592        ); // Both correspond to height 3
1593
1594        // Test: Empty heights
1595        let heights = vec![];
1596        let result = storage
1597            .read_certificates_by_heights(chain_id, &heights)
1598            .await
1599            .unwrap();
1600        assert_eq!(result.len(), 0);
1601    }
1602
1603    #[cfg(with_testing)]
1604    #[tokio::test]
1605    async fn test_read_certificates_by_heights_multiple_chains() {
1606        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1607
1608        // Create certificates for two different chains at same heights
1609        let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
1610        let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
1611
1612        let mut batch = MultiPartitionBatch::new();
1613
1614        let block_a = Block {
1615            header: BlockHeader {
1616                chain_id: chain_a,
1617                epoch: Epoch::ZERO,
1618                height: BlockHeight(10),
1619                timestamp: Timestamp::from(0),
1620                state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
1621                previous_block_hash: None,
1622                authenticated_owner: None,
1623                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
1624                messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
1625                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
1626                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
1627                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
1628                events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
1629                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
1630                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
1631            },
1632            body: BlockBody {
1633                transactions: vec![],
1634                messages: vec![],
1635                previous_message_blocks: Default::default(),
1636                previous_event_blocks: Default::default(),
1637                oracle_responses: vec![],
1638                events: vec![],
1639                blobs: vec![],
1640                operation_results: vec![],
1641            },
1642        };
1643        let confirmed_block_a = ConfirmedBlock::new(block_a);
1644        let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
1645        batch.add_certificate(&cert_a).unwrap();
1646
1647        let block_b = Block {
1648            header: BlockHeader {
1649                chain_id: chain_b,
1650                epoch: Epoch::ZERO,
1651                height: BlockHeight(10),
1652                timestamp: Timestamp::from(0),
1653                state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
1654                previous_block_hash: None,
1655                authenticated_owner: None,
1656                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
1657                messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
1658                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
1659                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
1660                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
1661                events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
1662                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
1663                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
1664            },
1665            body: BlockBody {
1666                transactions: vec![],
1667                messages: vec![],
1668                previous_message_blocks: Default::default(),
1669                previous_event_blocks: Default::default(),
1670                oracle_responses: vec![],
1671                events: vec![],
1672                blobs: vec![],
1673                operation_results: vec![],
1674            },
1675        };
1676        let confirmed_block_b = ConfirmedBlock::new(block_b);
1677        let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
1678        batch.add_certificate(&cert_b).unwrap();
1679
1680        storage.write_batch(batch).await.unwrap();
1681
1682        // Read from chain A - should get cert A
1683        let result = storage
1684            .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
1685            .await
1686            .unwrap();
1687        assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
1688
1689        // Read from chain B - should get cert B
1690        let result = storage
1691            .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
1692            .await
1693            .unwrap();
1694        assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
1695
1696        // Read from chain A for height that only chain B has - should get None
1697        let result = storage
1698            .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
1699            .await
1700            .unwrap();
1701        assert!(result[0].is_none());
1702    }
1703
1704    #[cfg(with_testing)]
1705    #[tokio::test]
1706    async fn test_read_certificates_by_heights_consistency() {
1707        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1708        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1709
1710        // Write certificate
1711        let mut batch = MultiPartitionBatch::new();
1712        let block = Block {
1713            header: BlockHeader {
1714                chain_id,
1715                epoch: Epoch::ZERO,
1716                height: BlockHeight(7),
1717                timestamp: Timestamp::from(0),
1718                state_hash: CryptoHash::new(&TestString::new("state_hash")),
1719                previous_block_hash: None,
1720                authenticated_owner: None,
1721                transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
1722                messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
1723                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
1724                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
1725                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
1726                events_hash: CryptoHash::new(&TestString::new("events_hash")),
1727                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1728                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
1729            },
1730            body: BlockBody {
1731                transactions: vec![],
1732                messages: vec![],
1733                previous_message_blocks: Default::default(),
1734                previous_event_blocks: Default::default(),
1735                oracle_responses: vec![],
1736                events: vec![],
1737                blobs: vec![],
1738                operation_results: vec![],
1739            },
1740        };
1741        let confirmed_block = ConfirmedBlock::new(block);
1742        let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1743        let hash = cert.hash();
1744        batch.add_certificate(&cert).unwrap();
1745        storage.write_batch(batch).await.unwrap();
1746
1747        // Read by hash
1748        let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
1749
1750        // Read by height
1751        let certs_by_height = storage
1752            .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
1753            .await
1754            .unwrap();
1755        let cert_by_height = certs_by_height[0].as_ref().unwrap();
1756
1757        // Should be identical
1758        assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
1759        assert_eq!(
1760            cert_by_hash.value().block().header,
1761            cert_by_height.value().block().header
1762        );
1763    }
1764}