linera_storage/
db_storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap},
6    fmt::Debug,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
16    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
17};
18use linera_chain::{
19    types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
20    ChainStateView,
21};
22use linera_execution::{
23    BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
24};
25use linera_views::{
26    backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
27    batch::Batch,
28    context::ViewContext,
29    store::{
30        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
31    },
32    views::View,
33    ViewError,
34};
35use serde::{Deserialize, Serialize};
36use tracing::instrument;
37#[cfg(with_testing)]
38use {
39    futures::channel::oneshot::{self, Receiver},
40    linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
41    std::cmp::Reverse,
42};
43
44use crate::{ChainRuntimeContext, Clock, Storage};
45
46#[cfg(with_metrics)]
47pub mod metrics {
48    use std::sync::LazyLock;
49
50    use linera_base::prometheus_util::{
51        exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
52    };
53    use prometheus::{HistogramVec, IntCounterVec};
54
55    /// The metric counting how often a blob is tested for existence from storage
56    pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
57        register_int_counter_vec(
58            "contains_blob",
59            "The metric counting how often a blob is tested for existence from storage",
60            &[],
61        )
62    });
63
64    /// The metric counting how often multiple blobs are tested for existence from storage
65    pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
66        register_int_counter_vec(
67            "contains_blobs",
68            "The metric counting how often multiple blobs are tested for existence from storage",
69            &[],
70        )
71    });
72
73    /// The metric counting how often a blob state is tested for existence from storage
74    pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
75        register_int_counter_vec(
76            "contains_blob_state",
77            "The metric counting how often a blob state is tested for existence from storage",
78            &[],
79        )
80    });
81
82    /// The metric counting how often a certificate is tested for existence from storage.
83    pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
84        register_int_counter_vec(
85            "contains_certificate",
86            "The metric counting how often a certificate is tested for existence from storage",
87            &[],
88        )
89    });
90
91    /// The metric counting how often a hashed certificate value is read from storage.
92    #[doc(hidden)]
93    pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
94        register_int_counter_vec(
95            "read_confirmed_block",
96            "The metric counting how often a hashed confirmed block is read from storage",
97            &[],
98        )
99    });
100
101    /// The metric counting how often confirmed blocks are read from storage.
102    #[doc(hidden)]
103    pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
104        LazyLock::new(|| {
105            register_int_counter_vec(
106                "read_confirmed_blocks",
107                "The metric counting how often confirmed blocks are read from storage",
108                &[],
109            )
110        });
111
112    /// The metric counting how often a blob is read from storage.
113    #[doc(hidden)]
114    pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
115        register_int_counter_vec(
116            "read_blob",
117            "The metric counting how often a blob is read from storage",
118            &[],
119        )
120    });
121
122    /// The metric counting how often a blob state is read from storage.
123    #[doc(hidden)]
124    pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
125        register_int_counter_vec(
126            "read_blob_state",
127            "The metric counting how often a blob state is read from storage",
128            &[],
129        )
130    });
131
132    /// The metric counting how often blob states are read from storage.
133    #[doc(hidden)]
134    pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
135        register_int_counter_vec(
136            "read_blob_states",
137            "The metric counting how often blob states are read from storage",
138            &[],
139        )
140    });
141
142    /// The metric counting how often a blob is written to storage.
143    #[doc(hidden)]
144    pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
145        register_int_counter_vec(
146            "write_blob",
147            "The metric counting how often a blob is written to storage",
148            &[],
149        )
150    });
151
152    /// The metric counting how often a certificate is read from storage.
153    #[doc(hidden)]
154    pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
155        register_int_counter_vec(
156            "read_certificate",
157            "The metric counting how often a certificate is read from storage",
158            &[],
159        )
160    });
161
162    /// The metric counting how often certificates are read from storage.
163    #[doc(hidden)]
164    pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
165        register_int_counter_vec(
166            "read_certificates",
167            "The metric counting how often certificate are read from storage",
168            &[],
169        )
170    });
171
172    /// The metric counting how often a certificate is written to storage.
173    #[doc(hidden)]
174    pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
175        register_int_counter_vec(
176            "write_certificate",
177            "The metric counting how often a certificate is written to storage",
178            &[],
179        )
180    });
181
182    /// The latency to load a chain state.
183    #[doc(hidden)]
184    pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
185        register_histogram_vec(
186            "load_chain_latency",
187            "The latency to load a chain state",
188            &[],
189            exponential_bucket_latencies(1000.0),
190        )
191    });
192
193    /// The metric counting how often an event is read from storage.
194    #[doc(hidden)]
195    pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
196        register_int_counter_vec(
197            "read_event",
198            "The metric counting how often an event is read from storage",
199            &[],
200        )
201    });
202
203    /// The metric counting how often an event is tested for existence from storage
204    pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
205        register_int_counter_vec(
206            "contains_event",
207            "The metric counting how often an event is tested for existence from storage",
208            &[],
209        )
210    });
211
212    /// The metric counting how often an event is written to storage.
213    #[doc(hidden)]
214    pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
215        register_int_counter_vec(
216            "write_event",
217            "The metric counting how often an event is written to storage",
218            &[],
219        )
220    });
221
222    /// The metric counting how often the network description is read from storage.
223    #[doc(hidden)]
224    pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
225        register_int_counter_vec(
226            "network_description",
227            "The metric counting how often the network description is read from storage",
228            &[],
229        )
230    });
231
232    /// The metric counting how often the network description is written to storage.
233    #[doc(hidden)]
234    pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
235        register_int_counter_vec(
236            "write_network_description",
237            "The metric counting how often the network description is written to storage",
238            &[],
239        )
240    });
241}
242
243/// The key used for blobs. The Blob ID itself is contained in the root key.
244const BLOB_KEY: &[u8] = &[0];
245
246/// The key used for blob states. The Blob ID itself is contained in the root key.
247const BLOB_STATE_KEY: &[u8] = &[1];
248
249/// The key used for lite certificates. The cryptohash itself is contained in the root key.
250const LITE_CERTIFICATE_KEY: &[u8] = &[2];
251
252/// The key used for confirmed blocks. The cryptohash itself is contained in the root key.
253const BLOCK_KEY: &[u8] = &[3];
254
255/// The key used for the network description.
256const NETWORK_DESCRIPTION_KEY: &[u8] = &[4];
257
258fn get_block_keys() -> Vec<Vec<u8>> {
259    vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
260}
261
262#[derive(Default)]
263#[allow(clippy::type_complexity)]
264struct MultiPartitionBatch {
265    keys_value_bytes: BTreeMap<Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>>,
266}
267
268impl MultiPartitionBatch {
269    fn new() -> Self {
270        Self::default()
271    }
272
273    fn put_key_values(&mut self, root_key: Vec<u8>, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
274        let entry = self.keys_value_bytes.entry(root_key).or_default();
275        entry.extend(key_values);
276    }
277
278    fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
279        self.put_key_values(root_key, vec![(key, value)]);
280    }
281
282    fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
283        #[cfg(with_metrics)]
284        metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
285        let root_key = RootKey::BlobId(blob.id()).bytes();
286        let key = BLOB_KEY.to_vec();
287        self.put_key_value(root_key, key, blob.bytes().to_vec());
288        Ok(())
289    }
290
291    fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
292        let root_key = RootKey::BlobId(blob_id).bytes();
293        let key = BLOB_STATE_KEY.to_vec();
294        let value = bcs::to_bytes(blob_state)?;
295        self.put_key_value(root_key, key, value);
296        Ok(())
297    }
298
299    /// Adds a certificate to the batch.
300    ///
301    /// Writes both the certificate data (indexed by hash) and a height index
302    /// (mapping chain_id + height to hash).
303    ///
304    /// Note: If called multiple times with the same `(chain_id, height)`, the height
305    /// index will be overwritten. The caller is responsible for ensuring that
306    /// certificates at the same height have the same hash.
307    fn add_certificate(
308        &mut self,
309        certificate: &ConfirmedBlockCertificate,
310    ) -> Result<(), ViewError> {
311        #[cfg(with_metrics)]
312        metrics::WRITE_CERTIFICATE_COUNTER
313            .with_label_values(&[])
314            .inc();
315        let hash = certificate.hash();
316
317        // Write certificate data by hash
318        let root_key = RootKey::BlockHash(hash).bytes();
319        let mut key_values = Vec::new();
320        let key = LITE_CERTIFICATE_KEY.to_vec();
321        let value = bcs::to_bytes(&certificate.lite_certificate())?;
322        key_values.push((key, value));
323        let key = BLOCK_KEY.to_vec();
324        let value = bcs::to_bytes(&certificate.value())?;
325        key_values.push((key, value));
326        self.put_key_values(root_key, key_values);
327
328        // Write height index: chain_id -> height -> hash
329        let chain_id = certificate.value().block().header.chain_id;
330        let height = certificate.value().block().header.height;
331        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
332        let height_key = to_height_key(height);
333        let index_value = bcs::to_bytes(&hash)?;
334        self.put_key_value(index_root_key, height_key, index_value);
335
336        Ok(())
337    }
338
339    fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
340        #[cfg(with_metrics)]
341        metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
342        let key = to_event_key(&event_id);
343        let root_key = RootKey::Event(event_id.chain_id).bytes();
344        self.put_key_value(root_key, key, value);
345        Ok(())
346    }
347
348    fn add_network_description(
349        &mut self,
350        information: &NetworkDescription,
351    ) -> Result<(), ViewError> {
352        #[cfg(with_metrics)]
353        metrics::WRITE_NETWORK_DESCRIPTION
354            .with_label_values(&[])
355            .inc();
356        let root_key = RootKey::NetworkDescription.bytes();
357        let key = NETWORK_DESCRIPTION_KEY.to_vec();
358        let value = bcs::to_bytes(information)?;
359        self.put_key_value(root_key, key, value);
360        Ok(())
361    }
362}
363
364/// Main implementation of the [`Storage`] trait.
365#[derive(Clone)]
366pub struct DbStorage<Database, Clock = WallClock> {
367    database: Arc<Database>,
368    clock: Clock,
369    thread_pool: Arc<linera_execution::ThreadPool>,
370    wasm_runtime: Option<WasmRuntime>,
371    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
372    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
373    execution_runtime_config: ExecutionRuntimeConfig,
374}
375
376#[derive(Debug, Serialize, Deserialize)]
377enum RootKey {
378    NetworkDescription,
379    BlockExporterState(u32),
380    ChainState(ChainId),
381    BlockHash(CryptoHash),
382    BlobId(BlobId),
383    Event(ChainId),
384    BlockByHeight(ChainId),
385}
386
387const CHAIN_ID_TAG: u8 = 2;
388const BLOB_ID_TAG: u8 = 4;
389const EVENT_ID_TAG: u8 = 5;
390
391impl RootKey {
392    fn bytes(&self) -> Vec<u8> {
393        bcs::to_bytes(self).unwrap()
394    }
395}
396
397#[derive(Debug, Serialize, Deserialize)]
398struct RestrictedEventId {
399    pub stream_id: StreamId,
400    pub index: u32,
401}
402
403fn to_event_key(event_id: &EventId) -> Vec<u8> {
404    let restricted_event_id = RestrictedEventId {
405        stream_id: event_id.stream_id.clone(),
406        index: event_id.index,
407    };
408    bcs::to_bytes(&restricted_event_id).unwrap()
409}
410
411pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
412    bcs::to_bytes(&height).unwrap()
413}
414
415fn is_chain_state(root_key: &[u8]) -> bool {
416    if root_key.is_empty() {
417        return false;
418    }
419    root_key[0] == CHAIN_ID_TAG
420}
421
422/// An implementation of [`DualStoreRootKeyAssignment`] that stores the
423/// chain states into the first store.
424#[derive(Clone, Copy)]
425pub struct ChainStatesFirstAssignment;
426
427impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
428    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
429        if root_key.is_empty() {
430            return Ok(StoreInUse::Second);
431        }
432        let store = match is_chain_state(root_key) {
433            true => StoreInUse::First,
434            false => StoreInUse::Second,
435        };
436        Ok(store)
437    }
438}
439
440/// A `Clock` implementation using the system clock.
441#[derive(Clone)]
442pub struct WallClock;
443
444#[cfg_attr(not(web), async_trait)]
445#[cfg_attr(web, async_trait(?Send))]
446impl Clock for WallClock {
447    fn current_time(&self) -> Timestamp {
448        Timestamp::now()
449    }
450
451    async fn sleep(&self, delta: TimeDelta) {
452        linera_base::time::timer::sleep(delta.as_duration()).await
453    }
454
455    async fn sleep_until(&self, timestamp: Timestamp) {
456        let delta = timestamp.delta_since(Timestamp::now());
457        if delta > TimeDelta::ZERO {
458            self.sleep(delta).await
459        }
460    }
461}
462
463#[cfg(with_testing)]
464#[derive(Default)]
465struct TestClockInner {
466    time: Timestamp,
467    sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
468    /// Optional callback that decides whether to auto-advance for a given target timestamp.
469    /// Returns `true` if the clock should auto-advance to that time.
470    sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
471}
472
473#[cfg(with_testing)]
474impl TestClockInner {
475    fn set(&mut self, time: Timestamp) {
476        self.time = time;
477        let senders = self.sleeps.split_off(&Reverse(time));
478        for sender in senders.into_values().flatten() {
479            // Receiver may have been dropped if the sleep was cancelled.
480            sender.send(()).ok();
481        }
482    }
483
484    fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
485        let target_time = self.time.saturating_add(delta);
486        self.add_sleep_until(target_time)
487    }
488
489    fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
490        let (sender, receiver) = oneshot::channel();
491        let should_auto_advance = self
492            .sleep_callback
493            .as_ref()
494            .is_some_and(|callback| callback(time));
495        if should_auto_advance && time > self.time {
496            // Auto-advance mode: immediately advance the clock and complete the sleep.
497            self.set(time);
498            // Receiver may have been dropped if the sleep was cancelled.
499            sender.send(()).ok();
500        } else if self.time >= time {
501            // Receiver may have been dropped if the sleep was cancelled.
502            sender.send(()).ok();
503        } else {
504            self.sleeps.entry(Reverse(time)).or_default().push(sender);
505        }
506        receiver
507    }
508}
509
510/// A clock implementation that uses a stored number of microseconds and that can be updated
511/// explicitly. All clones share the same time, and setting it in one clone updates all the others.
512#[cfg(with_testing)]
513#[derive(Clone, Default)]
514pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
515
516#[cfg(with_testing)]
517#[cfg_attr(not(web), async_trait)]
518#[cfg_attr(web, async_trait(?Send))]
519impl Clock for TestClock {
520    fn current_time(&self) -> Timestamp {
521        self.lock().time
522    }
523
524    async fn sleep(&self, delta: TimeDelta) {
525        if delta == TimeDelta::ZERO {
526            return;
527        }
528        let receiver = self.lock().add_sleep(delta);
529        // Sender may have been dropped if the clock was dropped; just stop waiting.
530        receiver.await.ok();
531    }
532
533    async fn sleep_until(&self, timestamp: Timestamp) {
534        let receiver = self.lock().add_sleep_until(timestamp);
535        // Sender may have been dropped if the clock was dropped; just stop waiting.
536        receiver.await.ok();
537    }
538}
539
540#[cfg(with_testing)]
541impl TestClock {
542    /// Creates a new clock with its time set to 0, i.e. the Unix epoch.
543    pub fn new() -> Self {
544        TestClock(Arc::default())
545    }
546
547    /// Sets the current time.
548    pub fn set(&self, time: Timestamp) {
549        self.lock().set(time);
550    }
551
552    /// Advances the current time by the specified delta.
553    pub fn add(&self, delta: TimeDelta) {
554        let mut guard = self.lock();
555        let time = guard.time.saturating_add(delta);
556        guard.set(time);
557    }
558
559    /// Returns the current time according to the test clock.
560    pub fn current_time(&self) -> Timestamp {
561        self.lock().time
562    }
563
564    /// Sets a callback that decides whether to auto-advance for each sleep call.
565    ///
566    /// The callback receives the target timestamp and should return `true` if the clock
567    /// should auto-advance to that time, or `false` if the sleep should block normally.
568    pub fn set_sleep_callback<F>(&self, callback: F)
569    where
570        F: Fn(Timestamp) -> bool + Send + Sync + 'static,
571    {
572        self.lock().sleep_callback = Some(Box::new(callback));
573    }
574
575    /// Clears the sleep callback.
576    pub fn clear_sleep_callback(&self) {
577        self.lock().sleep_callback = None;
578    }
579
580    fn lock(&self) -> std::sync::MutexGuard<'_, TestClockInner> {
581        self.0.lock().expect("poisoned TestClock mutex")
582    }
583}
584
585#[cfg_attr(not(web), async_trait)]
586#[cfg_attr(web, async_trait(?Send))]
587impl<Database, C> Storage for DbStorage<Database, C>
588where
589    Database: KeyValueDatabase<
590            Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
591            Error: Send + Sync,
592        > + Clone
593        + linera_base::util::traits::AutoTraits
594        + 'static,
595    C: Clock + Clone + Send + Sync + 'static,
596{
597    type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
598    type Clock = C;
599    type BlockExporterContext = ViewContext<u32, Database::Store>;
600
601    fn clock(&self) -> &C {
602        &self.clock
603    }
604
605    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
606        &self.thread_pool
607    }
608
609    #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
610    async fn load_chain(
611        &self,
612        chain_id: ChainId,
613    ) -> Result<ChainStateView<Self::Context>, ViewError> {
614        #[cfg(with_metrics)]
615        let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
616        let runtime_context = ChainRuntimeContext {
617            storage: self.clone(),
618            thread_pool: self.thread_pool.clone(),
619            chain_id,
620            execution_runtime_config: self.execution_runtime_config,
621            user_contracts: self.user_contracts.clone(),
622            user_services: self.user_services.clone(),
623        };
624        let root_key = RootKey::ChainState(chain_id).bytes();
625        let store = self.database.open_exclusive(&root_key)?;
626        let context = ViewContext::create_root_context(store, runtime_context).await?;
627        ChainStateView::load(context).await
628    }
629
630    #[instrument(level = "trace", skip_all, fields(%blob_id))]
631    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
632        let root_key = RootKey::BlobId(blob_id).bytes();
633        let store = self.database.open_shared(&root_key)?;
634        let test = store.contains_key(BLOB_KEY).await?;
635        #[cfg(with_metrics)]
636        metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
637        Ok(test)
638    }
639
640    #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
641    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
642        let mut missing_blobs = Vec::new();
643        for blob_id in blob_ids {
644            let root_key = RootKey::BlobId(*blob_id).bytes();
645            let store = self.database.open_shared(&root_key)?;
646            if !store.contains_key(BLOB_KEY).await? {
647                missing_blobs.push(*blob_id);
648            }
649        }
650        #[cfg(with_metrics)]
651        metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
652        Ok(missing_blobs)
653    }
654
655    #[instrument(skip_all, fields(%blob_id))]
656    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
657        let root_key = RootKey::BlobId(blob_id).bytes();
658        let store = self.database.open_shared(&root_key)?;
659        let test = store.contains_key(BLOB_STATE_KEY).await?;
660        #[cfg(with_metrics)]
661        metrics::CONTAINS_BLOB_STATE_COUNTER
662            .with_label_values(&[])
663            .inc();
664        Ok(test)
665    }
666
667    #[instrument(skip_all, fields(%hash))]
668    async fn read_confirmed_block(
669        &self,
670        hash: CryptoHash,
671    ) -> Result<Option<ConfirmedBlock>, ViewError> {
672        let root_key = RootKey::BlockHash(hash).bytes();
673        let store = self.database.open_shared(&root_key)?;
674        let value = store.read_value(BLOCK_KEY).await?;
675        #[cfg(with_metrics)]
676        metrics::READ_CONFIRMED_BLOCK_COUNTER
677            .with_label_values(&[])
678            .inc();
679        Ok(value)
680    }
681
682    #[instrument(skip_all)]
683    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
684        &self,
685        hashes: I,
686    ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError> {
687        let hashes = hashes.into_iter().collect::<Vec<_>>();
688        if hashes.is_empty() {
689            return Ok(Vec::new());
690        }
691        let root_keys = Self::get_root_keys_for_certificates(&hashes);
692        let mut blocks = Vec::new();
693        for root_key in root_keys {
694            let store = self.database.open_shared(&root_key)?;
695            blocks.push(store.read_value(BLOCK_KEY).await?);
696        }
697        #[cfg(with_metrics)]
698        metrics::READ_CONFIRMED_BLOCKS_COUNTER
699            .with_label_values(&[])
700            .inc_by(hashes.len() as u64);
701        Ok(blocks)
702    }
703
704    #[instrument(skip_all, fields(%blob_id))]
705    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
706        let root_key = RootKey::BlobId(blob_id).bytes();
707        let store = self.database.open_shared(&root_key)?;
708        let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
709        #[cfg(with_metrics)]
710        metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
711        Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
712    }
713
714    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
715    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
716        if blob_ids.is_empty() {
717            return Ok(Vec::new());
718        }
719        let mut blobs = Vec::new();
720        for blob_id in blob_ids {
721            blobs.push(self.read_blob(*blob_id).await?);
722        }
723        #[cfg(with_metrics)]
724        metrics::READ_BLOB_COUNTER
725            .with_label_values(&[])
726            .inc_by(blob_ids.len() as u64);
727        Ok(blobs)
728    }
729
730    #[instrument(skip_all, fields(%blob_id))]
731    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
732        let root_key = RootKey::BlobId(blob_id).bytes();
733        let store = self.database.open_shared(&root_key)?;
734        let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
735        #[cfg(with_metrics)]
736        metrics::READ_BLOB_STATE_COUNTER
737            .with_label_values(&[])
738            .inc();
739        Ok(blob_state)
740    }
741
742    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
743    async fn read_blob_states(
744        &self,
745        blob_ids: &[BlobId],
746    ) -> Result<Vec<Option<BlobState>>, ViewError> {
747        if blob_ids.is_empty() {
748            return Ok(Vec::new());
749        }
750        let mut blob_states = Vec::new();
751        for blob_id in blob_ids {
752            blob_states.push(self.read_blob_state(*blob_id).await?);
753        }
754        #[cfg(with_metrics)]
755        metrics::READ_BLOB_STATES_COUNTER
756            .with_label_values(&[])
757            .inc_by(blob_ids.len() as u64);
758        Ok(blob_states)
759    }
760
761    #[instrument(skip_all, fields(blob_id = %blob.id()))]
762    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
763        let mut batch = MultiPartitionBatch::new();
764        batch.add_blob(blob)?;
765        self.write_batch(batch).await?;
766        Ok(())
767    }
768
769    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
770    async fn maybe_write_blob_states(
771        &self,
772        blob_ids: &[BlobId],
773        blob_state: BlobState,
774    ) -> Result<(), ViewError> {
775        if blob_ids.is_empty() {
776            return Ok(());
777        }
778        let mut maybe_blob_states = Vec::new();
779        for blob_id in blob_ids {
780            let root_key = RootKey::BlobId(*blob_id).bytes();
781            let store = self.database.open_shared(&root_key)?;
782            let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
783            maybe_blob_states.push(maybe_blob_state);
784        }
785        let mut batch = MultiPartitionBatch::new();
786        for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
787            match maybe_blob_state {
788                None => {
789                    batch.add_blob_state(*blob_id, &blob_state)?;
790                }
791                Some(state) => {
792                    if state.epoch < blob_state.epoch {
793                        batch.add_blob_state(*blob_id, &blob_state)?;
794                    }
795                }
796            }
797        }
798        // We tolerate race conditions because two active chains are likely to
799        // be both from the latest epoch, and otherwise failing to pick the
800        // more recent blob state has limited impact.
801        self.write_batch(batch).await?;
802        Ok(())
803    }
804
805    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
806    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
807        if blobs.is_empty() {
808            return Ok(Vec::new());
809        }
810        let mut batch = MultiPartitionBatch::new();
811        let mut blob_states = Vec::new();
812        for blob in blobs {
813            let root_key = RootKey::BlobId(blob.id()).bytes();
814            let store = self.database.open_shared(&root_key)?;
815            let has_state = store.contains_key(BLOB_STATE_KEY).await?;
816            blob_states.push(has_state);
817            if has_state {
818                batch.add_blob(blob)?;
819            }
820        }
821        self.write_batch(batch).await?;
822        Ok(blob_states)
823    }
824
825    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
826    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
827        if blobs.is_empty() {
828            return Ok(());
829        }
830        let mut batch = MultiPartitionBatch::new();
831        for blob in blobs {
832            batch.add_blob(blob)?;
833        }
834        self.write_batch(batch).await
835    }
836
837    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
838    async fn write_blobs_and_certificate(
839        &self,
840        blobs: &[Blob],
841        certificate: &ConfirmedBlockCertificate,
842    ) -> Result<(), ViewError> {
843        let mut batch = MultiPartitionBatch::new();
844        for blob in blobs {
845            batch.add_blob(blob)?;
846        }
847        batch.add_certificate(certificate)?;
848        self.write_batch(batch).await
849    }
850
851    #[instrument(skip_all, fields(%hash))]
852    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
853        let root_key = RootKey::BlockHash(hash).bytes();
854        let store = self.database.open_shared(&root_key)?;
855        let results = store.contains_keys(&get_block_keys()).await?;
856        #[cfg(with_metrics)]
857        metrics::CONTAINS_CERTIFICATE_COUNTER
858            .with_label_values(&[])
859            .inc();
860        Ok(results[0] && results[1])
861    }
862
863    #[instrument(skip_all, fields(%hash))]
864    async fn read_certificate(
865        &self,
866        hash: CryptoHash,
867    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
868        let root_key = RootKey::BlockHash(hash).bytes();
869        let store = self.database.open_shared(&root_key)?;
870        let values = store.read_multi_values_bytes(&get_block_keys()).await?;
871        #[cfg(with_metrics)]
872        metrics::READ_CERTIFICATE_COUNTER
873            .with_label_values(&[])
874            .inc();
875        Self::deserialize_certificate(&values, hash)
876    }
877
878    #[instrument(skip_all)]
879    async fn read_certificates(
880        &self,
881        hashes: &[CryptoHash],
882    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
883        let raw_certs = self.read_certificates_raw(hashes).await?;
884
885        raw_certs
886            .into_iter()
887            .zip(hashes)
888            .map(|(maybe_raw, hash)| {
889                let Some((lite_cert_bytes, confirmed_block_bytes)) = maybe_raw else {
890                    return Ok(None);
891                };
892                let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
893                let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
894                assert_eq!(&value.hash(), hash);
895                let certificate = cert
896                    .with_value(value)
897                    .ok_or(ViewError::InconsistentEntries)?;
898                Ok(Some(certificate))
899            })
900            .collect()
901    }
902
903    #[instrument(skip_all)]
904    async fn read_certificates_raw(
905        &self,
906        hashes: &[CryptoHash],
907    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
908        if hashes.is_empty() {
909            return Ok(Vec::new());
910        }
911        let root_keys = Self::get_root_keys_for_certificates(hashes);
912        let mut values = Vec::new();
913        for root_key in root_keys {
914            let store = self.database.open_shared(&root_key)?;
915            values.extend(store.read_multi_values_bytes(&get_block_keys()).await?);
916        }
917        #[cfg(with_metrics)]
918        metrics::READ_CERTIFICATES_COUNTER
919            .with_label_values(&[])
920            .inc_by(hashes.len() as u64);
921        Ok(values
922            .chunks_exact(2)
923            .map(|chunk| {
924                let lite_cert_bytes = chunk[0].as_ref()?;
925                let confirmed_block_bytes = chunk[1].as_ref()?;
926                Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
927            })
928            .collect())
929    }
930
931    async fn read_certificate_hashes_by_heights(
932        &self,
933        chain_id: ChainId,
934        heights: &[BlockHeight],
935    ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
936        if heights.is_empty() {
937            return Ok(Vec::new());
938        }
939
940        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
941        let store = self.database.open_shared(&index_root_key)?;
942        let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
943        let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
944        let hash_options: Vec<Option<CryptoHash>> = hash_bytes
945            .into_iter()
946            .map(|opt| {
947                opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
948                    .transpose()
949            })
950            .collect::<Result<_, _>>()?;
951
952        Ok(hash_options)
953    }
954
955    #[instrument(skip_all)]
956    async fn read_certificates_by_heights_raw(
957        &self,
958        chain_id: ChainId,
959        heights: &[BlockHeight],
960    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
961        let hashes: Vec<Option<CryptoHash>> = self
962            .read_certificate_hashes_by_heights(chain_id, heights)
963            .await?;
964
965        // Map from hash to all indices in the heights array (handles duplicates)
966        let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
967        for (index, maybe_hash) in hashes.iter().enumerate() {
968            if let Some(hash) = maybe_hash {
969                indices.entry(*hash).or_default().push(index);
970            }
971        }
972
973        // Deduplicate hashes for the storage query
974        let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
975
976        let mut result = vec![None; heights.len()];
977
978        for (raw_cert, hash) in self
979            .read_certificates_raw(&unique_hashes)
980            .await?
981            .into_iter()
982            .zip(unique_hashes)
983        {
984            if let Some(idx_list) = indices.get(&hash) {
985                for &index in idx_list {
986                    result[index] = raw_cert.clone();
987                }
988            } else {
989                // This should not happen, but log a warning if it does.
990                tracing::error!(?hash, "certificate hash not found in indices map",);
991            }
992        }
993
994        Ok(result)
995    }
996
997    #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
998    async fn read_certificates_by_heights(
999        &self,
1000        chain_id: ChainId,
1001        heights: &[BlockHeight],
1002    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
1003        self.read_certificates_by_heights_raw(chain_id, heights)
1004            .await?
1005            .into_iter()
1006            .map(|maybe_raw| match maybe_raw {
1007                None => Ok(None),
1008                Some((lite_cert_bytes, confirmed_block_bytes)) => {
1009                    let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
1010                    let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
1011                    let certificate = cert
1012                        .with_value(value)
1013                        .ok_or(ViewError::InconsistentEntries)?;
1014                    Ok(Some(certificate))
1015                }
1016            })
1017            .collect()
1018    }
1019
1020    #[instrument(skip_all, fields(event_id = ?event_id))]
1021    async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
1022        let event_key = to_event_key(&event_id);
1023        let root_key = RootKey::Event(event_id.chain_id).bytes();
1024        let store = self.database.open_shared(&root_key)?;
1025        let event = store.read_value_bytes(&event_key).await?;
1026        #[cfg(with_metrics)]
1027        metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
1028        Ok(event)
1029    }
1030
1031    #[instrument(skip_all, fields(event_id = ?event_id))]
1032    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1033        let event_key = to_event_key(&event_id);
1034        let root_key = RootKey::Event(event_id.chain_id).bytes();
1035        let store = self.database.open_shared(&root_key)?;
1036        let exists = store.contains_key(&event_key).await?;
1037        #[cfg(with_metrics)]
1038        metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
1039        Ok(exists)
1040    }
1041
1042    #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
1043    async fn read_events_from_index(
1044        &self,
1045        chain_id: &ChainId,
1046        stream_id: &StreamId,
1047        start_index: u32,
1048    ) -> Result<Vec<IndexAndEvent>, ViewError> {
1049        let root_key = RootKey::Event(*chain_id).bytes();
1050        let store = self.database.open_shared(&root_key)?;
1051        let mut keys = Vec::new();
1052        let mut indices = Vec::new();
1053        let prefix = bcs::to_bytes(stream_id).unwrap();
1054        for short_key in store.find_keys_by_prefix(&prefix).await? {
1055            let index = bcs::from_bytes::<u32>(&short_key)?;
1056            if index >= start_index {
1057                let mut key = prefix.clone();
1058                key.extend(short_key);
1059                keys.push(key);
1060                indices.push(index);
1061            }
1062        }
1063        let values = store.read_multi_values_bytes(&keys).await?;
1064        let mut returned_values = Vec::new();
1065        for (index, value) in indices.into_iter().zip(values) {
1066            let event = value.unwrap();
1067            returned_values.push(IndexAndEvent { index, event });
1068        }
1069        Ok(returned_values)
1070    }
1071
1072    #[instrument(skip_all)]
1073    async fn write_events(
1074        &self,
1075        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1076    ) -> Result<(), ViewError> {
1077        let mut batch = MultiPartitionBatch::new();
1078        for (event_id, value) in events {
1079            batch.add_event(event_id, value)?;
1080        }
1081        self.write_batch(batch).await
1082    }
1083
1084    #[instrument(skip_all)]
1085    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1086        let root_key = RootKey::NetworkDescription.bytes();
1087        let store = self.database.open_shared(&root_key)?;
1088        let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1089        #[cfg(with_metrics)]
1090        metrics::READ_NETWORK_DESCRIPTION
1091            .with_label_values(&[])
1092            .inc();
1093        Ok(maybe_value)
1094    }
1095
1096    #[instrument(skip_all)]
1097    async fn write_network_description(
1098        &self,
1099        information: &NetworkDescription,
1100    ) -> Result<(), ViewError> {
1101        let mut batch = MultiPartitionBatch::new();
1102        batch.add_network_description(information)?;
1103        self.write_batch(batch).await?;
1104        Ok(())
1105    }
1106
1107    fn wasm_runtime(&self) -> Option<WasmRuntime> {
1108        self.wasm_runtime
1109    }
1110
1111    #[instrument(skip_all)]
1112    async fn block_exporter_context(
1113        &self,
1114        block_exporter_id: u32,
1115    ) -> Result<Self::BlockExporterContext, ViewError> {
1116        let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1117        let store = self.database.open_exclusive(&root_key)?;
1118        Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1119    }
1120
1121    async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError> {
1122        let root_keys = self.database.list_root_keys().await?;
1123        let mut blob_ids = Vec::new();
1124        for root_key in root_keys {
1125            if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1126                let root_key_red = &root_key[1..];
1127                let blob_id = bcs::from_bytes(root_key_red)?;
1128                blob_ids.push(blob_id);
1129            }
1130        }
1131        Ok(blob_ids)
1132    }
1133
1134    async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError> {
1135        let root_keys = self.database.list_root_keys().await?;
1136        let mut chain_ids = Vec::new();
1137        for root_key in root_keys {
1138            if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1139                let root_key_red = &root_key[1..];
1140                let chain_id = bcs::from_bytes(root_key_red)?;
1141                chain_ids.push(chain_id);
1142            }
1143        }
1144        Ok(chain_ids)
1145    }
1146
1147    async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError> {
1148        let root_keys = self.database.list_root_keys().await?;
1149        let mut event_ids = Vec::new();
1150        for root_key in root_keys {
1151            if !root_key.is_empty() && root_key[0] == EVENT_ID_TAG {
1152                let root_key_red = &root_key[1..];
1153                let chain_id = bcs::from_bytes(root_key_red)?;
1154                let store = self.database.open_shared(&root_key)?;
1155                let keys = store.find_keys_by_prefix(&[]).await?;
1156                for key in keys {
1157                    let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
1158                    let event_id = EventId {
1159                        chain_id,
1160                        stream_id: restricted_event_id.stream_id,
1161                        index: restricted_event_id.index,
1162                    };
1163                    event_ids.push(event_id);
1164                }
1165            }
1166        }
1167        Ok(event_ids)
1168    }
1169}
1170
1171impl<Database, C> DbStorage<Database, C>
1172where
1173    Database: KeyValueDatabase + Clone,
1174    Database::Store: KeyValueStore + Clone,
1175    C: Clock,
1176    Database::Error: Send + Sync,
1177{
1178    #[instrument(skip_all)]
1179    fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1180        hashes
1181            .iter()
1182            .map(|hash| RootKey::BlockHash(*hash).bytes())
1183            .collect()
1184    }
1185
1186    #[instrument(skip_all)]
1187    fn deserialize_certificate(
1188        pair: &[Option<Vec<u8>>],
1189        hash: CryptoHash,
1190    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1191        let Some(cert_bytes) = pair[0].as_ref() else {
1192            return Ok(None);
1193        };
1194        let Some(value_bytes) = pair[1].as_ref() else {
1195            return Ok(None);
1196        };
1197        let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
1198        let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
1199        assert_eq!(value.hash(), hash);
1200        let certificate = cert
1201            .with_value(value)
1202            .ok_or(ViewError::InconsistentEntries)?;
1203        Ok(Some(certificate))
1204    }
1205
1206    #[instrument(skip_all)]
1207    async fn write_entry(
1208        store: &Database::Store,
1209        key_values: Vec<(Vec<u8>, Vec<u8>)>,
1210    ) -> Result<(), ViewError> {
1211        let mut batch = Batch::new();
1212        for (key, value) in key_values {
1213            batch.put_key_value_bytes(key, value);
1214        }
1215        store.write_batch(batch).await?;
1216        Ok(())
1217    }
1218
1219    #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1220    async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1221        if batch.keys_value_bytes.is_empty() {
1222            return Ok(());
1223        }
1224        let mut futures = Vec::new();
1225        for (root_key, key_values) in batch.keys_value_bytes {
1226            let store = self.database.open_shared(&root_key)?;
1227            futures.push(async move { Self::write_entry(&store, key_values).await });
1228        }
1229        futures::future::try_join_all(futures).await?;
1230        Ok(())
1231    }
1232}
1233
1234impl<Database, C> DbStorage<Database, C> {
1235    fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1236        Self {
1237            database: Arc::new(database),
1238            clock,
1239            // The `Arc` here is required on native but useless on the Web.
1240            #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1241            thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1242            wasm_runtime,
1243            user_contracts: Arc::new(papaya::HashMap::new()),
1244            user_services: Arc::new(papaya::HashMap::new()),
1245            execution_runtime_config: ExecutionRuntimeConfig::default(),
1246        }
1247    }
1248
1249    /// Sets whether contract log messages should be output.
1250    pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1251        self.execution_runtime_config.allow_application_logs = allow;
1252        self
1253    }
1254}
1255
1256impl<Database> DbStorage<Database, WallClock>
1257where
1258    Database: KeyValueDatabase + Clone + 'static,
1259    Database::Error: Send + Sync,
1260    Database::Store: KeyValueStore + Clone + 'static,
1261{
1262    pub async fn maybe_create_and_connect(
1263        config: &Database::Config,
1264        namespace: &str,
1265        wasm_runtime: Option<WasmRuntime>,
1266    ) -> Result<Self, Database::Error> {
1267        let database = Database::maybe_create_and_connect(config, namespace).await?;
1268        Ok(Self::new(database, wasm_runtime, WallClock))
1269    }
1270
1271    pub async fn connect(
1272        config: &Database::Config,
1273        namespace: &str,
1274        wasm_runtime: Option<WasmRuntime>,
1275    ) -> Result<Self, Database::Error> {
1276        let database = Database::connect(config, namespace).await?;
1277        Ok(Self::new(database, wasm_runtime, WallClock))
1278    }
1279}
1280
1281#[cfg(with_testing)]
1282impl<Database> DbStorage<Database, TestClock>
1283where
1284    Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1285    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1286    Database::Error: Send + Sync,
1287{
1288    pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1289        let config = Database::new_test_config().await.unwrap();
1290        let namespace = generate_test_namespace();
1291        DbStorage::<Database, TestClock>::new_for_testing(
1292            config,
1293            &namespace,
1294            wasm_runtime,
1295            TestClock::new(),
1296        )
1297        .await
1298        .unwrap()
1299    }
1300
1301    pub async fn new_for_testing(
1302        config: Database::Config,
1303        namespace: &str,
1304        wasm_runtime: Option<WasmRuntime>,
1305        clock: TestClock,
1306    ) -> Result<Self, Database::Error> {
1307        let database = Database::recreate_and_connect(&config, namespace).await?;
1308        Ok(Self::new(database, wasm_runtime, clock))
1309    }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314    use linera_base::{
1315        crypto::{CryptoHash, TestString},
1316        data_types::{BlockHeight, Epoch, Round, Timestamp},
1317        identifiers::{
1318            ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
1319            StreamName,
1320        },
1321    };
1322    use linera_chain::{
1323        block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
1324        types::ConfirmedBlockCertificate,
1325    };
1326    use linera_views::{
1327        memory::MemoryDatabase,
1328        store::{KeyValueDatabase, ReadableKeyValueStore as _},
1329    };
1330
1331    use crate::{
1332        db_storage::{
1333            to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
1334            EVENT_ID_TAG,
1335        },
1336        DbStorage, Storage, TestClock,
1337    };
1338
1339    // Several functionalities of the storage rely on the way that the serialization
1340    // is done. Thus we need to check that the serialization works in the way that
1341    // we expect.
1342
1343    // The listing of the blobs in `list_blob_ids` depends on the serialization
1344    // of `RootKey::Blob`.
1345    #[test]
1346    fn test_root_key_blob_serialization() {
1347        let hash = CryptoHash::default();
1348        let blob_type = BlobType::default();
1349        let blob_id = BlobId::new(hash, blob_type);
1350        let root_key = RootKey::BlobId(blob_id).bytes();
1351        assert_eq!(root_key[0], BLOB_ID_TAG);
1352        assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
1353    }
1354
1355    // The listing of the chains in `list_chain_ids` depends on the serialization
1356    // of `RootKey::ChainState`.
1357    #[test]
1358    fn test_root_key_chainstate_serialization() {
1359        let hash = CryptoHash::default();
1360        let chain_id = ChainId(hash);
1361        let root_key = RootKey::ChainState(chain_id).bytes();
1362        assert_eq!(root_key[0], CHAIN_ID_TAG);
1363        assert_eq!(
1364            bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
1365            chain_id
1366        );
1367    }
1368
1369    // The listing of the events in `read_events_from_index` depends on the
1370    // serialization of `RootKey::Event`.
1371    #[test]
1372    fn test_root_key_event_serialization() {
1373        let hash = CryptoHash::test_hash("49");
1374        let chain_id = ChainId(hash);
1375        let application_description_hash = CryptoHash::test_hash("42");
1376        let application_id = ApplicationId::new(application_description_hash);
1377        let application_id = GenericApplicationId::User(application_id);
1378        let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
1379        let stream_id = StreamId {
1380            application_id,
1381            stream_name,
1382        };
1383        let prefix = bcs::to_bytes(&stream_id).unwrap();
1384
1385        let index = 1567;
1386        let event_id = EventId {
1387            chain_id,
1388            stream_id,
1389            index,
1390        };
1391        let root_key = RootKey::Event(chain_id).bytes();
1392        assert_eq!(root_key[0], EVENT_ID_TAG);
1393        let key = to_event_key(&event_id);
1394        assert!(key.starts_with(&prefix));
1395    }
1396
1397    // The height index lookup depends on the serialization of RootKey::BlockByHeight
1398    // and to_height_key, following the same pattern as Event.
1399    #[test]
1400    fn test_root_key_block_by_height_serialization() {
1401        use linera_base::data_types::BlockHeight;
1402
1403        let hash = CryptoHash::default();
1404        let chain_id = ChainId(hash);
1405        let height = BlockHeight(42);
1406
1407        // RootKey::BlockByHeight uses only ChainId for partitioning (like Event)
1408        let root_key = RootKey::BlockByHeight(chain_id).bytes();
1409        let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
1410        assert_eq!(deserialized_chain_id, chain_id);
1411
1412        // Height is encoded as a key (like index in Event)
1413        let height_key = to_height_key(height);
1414        let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
1415        assert_eq!(deserialized_height, height);
1416    }
1417
1418    #[cfg(with_testing)]
1419    #[tokio::test]
1420    async fn test_add_certificate_creates_height_index() {
1421        // Create test storage
1422        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1423
1424        // Create a test certificate at a specific height
1425        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1426        let height = BlockHeight(5);
1427        let block = Block {
1428            header: BlockHeader {
1429                chain_id,
1430                epoch: Epoch::ZERO,
1431                height,
1432                timestamp: Timestamp::from(0),
1433                state_hash: CryptoHash::new(&TestString::new("state_hash")),
1434                previous_block_hash: None,
1435                authenticated_owner: None,
1436                transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
1437                messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
1438                previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1439                    "prev_msg_blocks_hash",
1440                )),
1441                previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1442                    "prev_event_blocks_hash",
1443                )),
1444                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
1445                events_hash: CryptoHash::new(&TestString::new("events_hash")),
1446                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1447                operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
1448            },
1449            body: BlockBody {
1450                transactions: vec![],
1451                messages: vec![],
1452                previous_message_blocks: Default::default(),
1453                previous_event_blocks: Default::default(),
1454                oracle_responses: vec![],
1455                events: vec![],
1456                blobs: vec![],
1457                operation_results: vec![],
1458            },
1459        };
1460        let confirmed_block = ConfirmedBlock::new(block);
1461        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1462
1463        // Write certificate
1464        let mut batch = MultiPartitionBatch::new();
1465        batch.add_certificate(&certificate).unwrap();
1466        storage.write_batch(batch).await.unwrap();
1467
1468        // Verify height index was created (following Event pattern)
1469        let hash = certificate.hash();
1470        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1471        let store = storage.database.open_shared(&index_root_key).unwrap();
1472        let height_key = to_height_key(height);
1473        let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
1474
1475        assert!(value_bytes.is_some(), "Height index was not created");
1476        let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
1477        assert_eq!(stored_hash, hash, "Height index contains wrong hash");
1478    }
1479
1480    #[cfg(with_testing)]
1481    #[tokio::test]
1482    async fn test_read_certificates_by_heights() {
1483        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1484        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1485
1486        // Write certificates at heights 1, 3, 5
1487        let mut batch = MultiPartitionBatch::new();
1488        let mut expected_certs = vec![];
1489
1490        for height in [1, 3, 5] {
1491            let block = Block {
1492                header: BlockHeader {
1493                    chain_id,
1494                    epoch: Epoch::ZERO,
1495                    height: BlockHeight(height),
1496                    timestamp: Timestamp::from(0),
1497                    state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
1498                    previous_block_hash: None,
1499                    authenticated_owner: None,
1500                    transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
1501                    messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
1502                    previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1503                        "pmb_hash_{height}",
1504                    )),
1505                    previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1506                        "peb_hash_{height}",
1507                    )),
1508                    oracle_responses_hash: CryptoHash::new(&TestString::new(
1509                        "oracle_hash_{height}",
1510                    )),
1511                    events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
1512                    blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
1513                    operation_results_hash: CryptoHash::new(&TestString::new(
1514                        "op_results_hash_{height}",
1515                    )),
1516                },
1517                body: BlockBody {
1518                    transactions: vec![],
1519                    messages: vec![],
1520                    previous_message_blocks: Default::default(),
1521                    previous_event_blocks: Default::default(),
1522                    oracle_responses: vec![],
1523                    events: vec![],
1524                    blobs: vec![],
1525                    operation_results: vec![],
1526                },
1527            };
1528            let confirmed_block = ConfirmedBlock::new(block);
1529            let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1530            expected_certs.push((height, cert.clone()));
1531            batch.add_certificate(&cert).unwrap();
1532        }
1533        storage.write_batch(batch).await.unwrap();
1534
1535        // Test: Read in order [1, 3, 5]
1536        let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
1537        let result = storage
1538            .read_certificates_by_heights(chain_id, &heights)
1539            .await
1540            .unwrap();
1541        assert_eq!(result.len(), 3);
1542        assert_eq!(
1543            result[0].as_ref().unwrap().hash(),
1544            expected_certs[0].1.hash()
1545        );
1546        assert_eq!(
1547            result[1].as_ref().unwrap().hash(),
1548            expected_certs[1].1.hash()
1549        );
1550        assert_eq!(
1551            result[2].as_ref().unwrap().hash(),
1552            expected_certs[2].1.hash()
1553        );
1554
1555        // Test: Read out of order [5, 1, 3]
1556        let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
1557        let result = storage
1558            .read_certificates_by_heights(chain_id, &heights)
1559            .await
1560            .unwrap();
1561        assert_eq!(result.len(), 3);
1562        assert_eq!(
1563            result[0].as_ref().unwrap().hash(),
1564            expected_certs[2].1.hash()
1565        );
1566        assert_eq!(
1567            result[1].as_ref().unwrap().hash(),
1568            expected_certs[0].1.hash()
1569        );
1570        assert_eq!(
1571            result[2].as_ref().unwrap().hash(),
1572            expected_certs[1].1.hash()
1573        );
1574
1575        // Test: Read with missing heights [1, 2, 3]
1576        let heights = vec![
1577            BlockHeight(1),
1578            BlockHeight(2),
1579            BlockHeight(3),
1580            BlockHeight(3),
1581        ];
1582        let result = storage
1583            .read_certificates_by_heights(chain_id, &heights)
1584            .await
1585            .unwrap();
1586        assert_eq!(result.len(), 4); // BlockHeight(3) was duplicated.
1587        assert!(result[0].is_some());
1588        assert!(result[1].is_none()); // Height 2 doesn't exist
1589        assert!(result[2].is_some());
1590        assert!(result[3].is_some());
1591        assert_eq!(
1592            result[2].as_ref().unwrap().hash(),
1593            result[3].as_ref().unwrap().hash()
1594        ); // Both correspond to height 3
1595
1596        // Test: Empty heights
1597        let heights = vec![];
1598        let result = storage
1599            .read_certificates_by_heights(chain_id, &heights)
1600            .await
1601            .unwrap();
1602        assert_eq!(result.len(), 0);
1603    }
1604
1605    #[cfg(with_testing)]
1606    #[tokio::test]
1607    async fn test_read_certificates_by_heights_multiple_chains() {
1608        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1609
1610        // Create certificates for two different chains at same heights
1611        let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
1612        let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
1613
1614        let mut batch = MultiPartitionBatch::new();
1615
1616        let block_a = Block {
1617            header: BlockHeader {
1618                chain_id: chain_a,
1619                epoch: Epoch::ZERO,
1620                height: BlockHeight(10),
1621                timestamp: Timestamp::from(0),
1622                state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
1623                previous_block_hash: None,
1624                authenticated_owner: None,
1625                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
1626                messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
1627                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
1628                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
1629                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
1630                events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
1631                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
1632                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
1633            },
1634            body: BlockBody {
1635                transactions: vec![],
1636                messages: vec![],
1637                previous_message_blocks: Default::default(),
1638                previous_event_blocks: Default::default(),
1639                oracle_responses: vec![],
1640                events: vec![],
1641                blobs: vec![],
1642                operation_results: vec![],
1643            },
1644        };
1645        let confirmed_block_a = ConfirmedBlock::new(block_a);
1646        let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
1647        batch.add_certificate(&cert_a).unwrap();
1648
1649        let block_b = Block {
1650            header: BlockHeader {
1651                chain_id: chain_b,
1652                epoch: Epoch::ZERO,
1653                height: BlockHeight(10),
1654                timestamp: Timestamp::from(0),
1655                state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
1656                previous_block_hash: None,
1657                authenticated_owner: None,
1658                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
1659                messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
1660                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
1661                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
1662                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
1663                events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
1664                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
1665                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
1666            },
1667            body: BlockBody {
1668                transactions: vec![],
1669                messages: vec![],
1670                previous_message_blocks: Default::default(),
1671                previous_event_blocks: Default::default(),
1672                oracle_responses: vec![],
1673                events: vec![],
1674                blobs: vec![],
1675                operation_results: vec![],
1676            },
1677        };
1678        let confirmed_block_b = ConfirmedBlock::new(block_b);
1679        let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
1680        batch.add_certificate(&cert_b).unwrap();
1681
1682        storage.write_batch(batch).await.unwrap();
1683
1684        // Read from chain A - should get cert A
1685        let result = storage
1686            .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
1687            .await
1688            .unwrap();
1689        assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
1690
1691        // Read from chain B - should get cert B
1692        let result = storage
1693            .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
1694            .await
1695            .unwrap();
1696        assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
1697
1698        // Read from chain A for height that only chain B has - should get None
1699        let result = storage
1700            .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
1701            .await
1702            .unwrap();
1703        assert!(result[0].is_none());
1704    }
1705
1706    #[cfg(with_testing)]
1707    #[tokio::test]
1708    async fn test_read_certificates_by_heights_consistency() {
1709        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1710        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1711
1712        // Write certificate
1713        let mut batch = MultiPartitionBatch::new();
1714        let block = Block {
1715            header: BlockHeader {
1716                chain_id,
1717                epoch: Epoch::ZERO,
1718                height: BlockHeight(7),
1719                timestamp: Timestamp::from(0),
1720                state_hash: CryptoHash::new(&TestString::new("state_hash")),
1721                previous_block_hash: None,
1722                authenticated_owner: None,
1723                transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
1724                messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
1725                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
1726                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
1727                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
1728                events_hash: CryptoHash::new(&TestString::new("events_hash")),
1729                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1730                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
1731            },
1732            body: BlockBody {
1733                transactions: vec![],
1734                messages: vec![],
1735                previous_message_blocks: Default::default(),
1736                previous_event_blocks: Default::default(),
1737                oracle_responses: vec![],
1738                events: vec![],
1739                blobs: vec![],
1740                operation_results: vec![],
1741            },
1742        };
1743        let confirmed_block = ConfirmedBlock::new(block);
1744        let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1745        let hash = cert.hash();
1746        batch.add_certificate(&cert).unwrap();
1747        storage.write_batch(batch).await.unwrap();
1748
1749        // Read by hash
1750        let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
1751
1752        // Read by height
1753        let certs_by_height = storage
1754            .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
1755            .await
1756            .unwrap();
1757        let cert_by_height = certs_by_height[0].as_ref().unwrap();
1758
1759        // Should be identical
1760        assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
1761        assert_eq!(
1762            cert_by_hash.value().block().header,
1763            cert_by_height.value().block().header
1764        );
1765    }
1766}