linera_storage/
db_storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use linera_base::{
10    crypto::CryptoHash,
11    data_types::{Blob, NetworkDescription, TimeDelta, Timestamp},
12    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
13};
14use linera_chain::{
15    types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
16    ChainStateView,
17};
18use linera_execution::{
19    BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
20};
21use linera_views::{
22    backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
23    batch::Batch,
24    context::ViewContext,
25    store::{
26        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
27    },
28    views::View,
29    ViewError,
30};
31use serde::{Deserialize, Serialize};
32use tracing::instrument;
33#[cfg(with_testing)]
34use {
35    futures::channel::oneshot::{self, Receiver},
36    linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
37    std::{cmp::Reverse, collections::BTreeMap},
38};
39
40use crate::{ChainRuntimeContext, Clock, Storage};
41
42#[cfg(with_metrics)]
43pub mod metrics {
44    use std::sync::LazyLock;
45
46    use linera_base::prometheus_util::{
47        exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
48    };
49    use prometheus::{HistogramVec, IntCounterVec};
50
51    /// The metric counting how often a blob is tested for existence from storage
52    pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
53        register_int_counter_vec(
54            "contains_blob",
55            "The metric counting how often a blob is tested for existence from storage",
56            &[],
57        )
58    });
59
60    /// The metric counting how often multiple blobs are tested for existence from storage
61    pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
62        register_int_counter_vec(
63            "contains_blobs",
64            "The metric counting how often multiple blobs are tested for existence from storage",
65            &[],
66        )
67    });
68
69    /// The metric counting how often a blob state is tested for existence from storage
70    pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
71        register_int_counter_vec(
72            "contains_blob_state",
73            "The metric counting how often a blob state is tested for existence from storage",
74            &[],
75        )
76    });
77
78    /// The metric counting how often a certificate is tested for existence from storage.
79    pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
80        register_int_counter_vec(
81            "contains_certificate",
82            "The metric counting how often a certificate is tested for existence from storage",
83            &[],
84        )
85    });
86
87    /// The metric counting how often a hashed certificate value is read from storage.
88    #[doc(hidden)]
89    pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
90        register_int_counter_vec(
91            "read_confirmed_block",
92            "The metric counting how often a hashed confirmed block is read from storage",
93            &[],
94        )
95    });
96
97    /// The metric counting how often a blob is read from storage.
98    #[doc(hidden)]
99    pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
100        register_int_counter_vec(
101            "read_blob",
102            "The metric counting how often a blob is read from storage",
103            &[],
104        )
105    });
106
107    /// The metric counting how often a blob state is read from storage.
108    #[doc(hidden)]
109    pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
110        register_int_counter_vec(
111            "read_blob_state",
112            "The metric counting how often a blob state is read from storage",
113            &[],
114        )
115    });
116
117    /// The metric counting how often blob states are read from storage.
118    #[doc(hidden)]
119    pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
120        register_int_counter_vec(
121            "read_blob_states",
122            "The metric counting how often blob states are read from storage",
123            &[],
124        )
125    });
126
127    /// The metric counting how often a blob is written to storage.
128    #[doc(hidden)]
129    pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
130        register_int_counter_vec(
131            "write_blob",
132            "The metric counting how often a blob is written to storage",
133            &[],
134        )
135    });
136
137    /// The metric counting how often a certificate is read from storage.
138    #[doc(hidden)]
139    pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
140        register_int_counter_vec(
141            "read_certificate",
142            "The metric counting how often a certificate is read from storage",
143            &[],
144        )
145    });
146
147    /// The metric counting how often certificates are read from storage.
148    #[doc(hidden)]
149    pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
150        register_int_counter_vec(
151            "read_certificates",
152            "The metric counting how often certificate are read from storage",
153            &[],
154        )
155    });
156
157    /// The metric counting how often a certificate is written to storage.
158    #[doc(hidden)]
159    pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
160        register_int_counter_vec(
161            "write_certificate",
162            "The metric counting how often a certificate is written to storage",
163            &[],
164        )
165    });
166
167    /// The latency to load a chain state.
168    #[doc(hidden)]
169    pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
170        register_histogram_vec(
171            "load_chain_latency",
172            "The latency to load a chain state",
173            &[],
174            exponential_bucket_latencies(10.0),
175        )
176    });
177
178    /// The metric counting how often an event is read from storage.
179    #[doc(hidden)]
180    pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
181        register_int_counter_vec(
182            "read_event",
183            "The metric counting how often an event is read from storage",
184            &[],
185        )
186    });
187
188    /// The metric counting how often an event is tested for existence from storage
189    pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
190        register_int_counter_vec(
191            "contains_event",
192            "The metric counting how often an event is tested for existence from storage",
193            &[],
194        )
195    });
196
197    /// The metric counting how often an event is written to storage.
198    #[doc(hidden)]
199    pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
200        register_int_counter_vec(
201            "write_event",
202            "The metric counting how often an event is written to storage",
203            &[],
204        )
205    });
206
207    /// The metric counting how often the network description is read from storage.
208    #[doc(hidden)]
209    pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
210        register_int_counter_vec(
211            "network_description",
212            "The metric counting how often the network description is read from storage",
213            &[],
214        )
215    });
216
217    /// The metric counting how often the network description is written to storage.
218    #[doc(hidden)]
219    pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
220        register_int_counter_vec(
221            "write_network_description",
222            "The metric counting how often the network description is written to storage",
223            &[],
224        )
225    });
226}
227
228/// The default key used when the root_key contains the information.
229const DEFAULT_KEY: &[u8] = &[0];
230
231/// The second key used when the root_key contains the information.
232const ONE_KEY: &[u8] = &[1];
233
234fn get_01_keys() -> Vec<Vec<u8>> {
235    vec![vec![0], vec![1]]
236}
237
238#[derive(Default)]
239struct MultiPartitionBatch {
240    keys_value_bytes: Vec<(Vec<u8>, Vec<u8>, Vec<u8>)>,
241}
242
243impl MultiPartitionBatch {
244    fn new() -> Self {
245        Self::default()
246    }
247
248    fn put_key_value_bytes(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
249        self.keys_value_bytes.push((root_key, key, value));
250    }
251
252    fn put_key_value<T: Serialize>(
253        &mut self,
254        root_key: Vec<u8>,
255        value: &T,
256    ) -> Result<(), ViewError> {
257        let bytes = bcs::to_bytes(value)?;
258        let key = DEFAULT_KEY.to_vec();
259        self.keys_value_bytes.push((root_key, key, bytes));
260        Ok(())
261    }
262
263    fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
264        #[cfg(with_metrics)]
265        metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
266        let root_key = RootKey::Blob(blob.id()).bytes();
267        let key = DEFAULT_KEY.to_vec();
268        self.put_key_value_bytes(root_key, key, blob.bytes().to_vec());
269        Ok(())
270    }
271
272    fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
273        let root_key = RootKey::Blob(blob_id).bytes();
274        let key = ONE_KEY.to_vec();
275        let value = bcs::to_bytes(blob_state)?;
276        self.put_key_value_bytes(root_key, key, value);
277        Ok(())
278    }
279
280    fn add_certificate(
281        &mut self,
282        certificate: &ConfirmedBlockCertificate,
283    ) -> Result<(), ViewError> {
284        #[cfg(with_metrics)]
285        metrics::WRITE_CERTIFICATE_COUNTER
286            .with_label_values(&[])
287            .inc();
288        let hash = certificate.hash();
289        let root_key = RootKey::CryptoHash(hash).bytes();
290        self.put_key_value(root_key.clone(), &certificate.lite_certificate())?;
291        let value = bcs::to_bytes(&certificate.value())?;
292        self.put_key_value_bytes(root_key, ONE_KEY.to_vec(), value);
293        Ok(())
294    }
295
296    fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
297        #[cfg(with_metrics)]
298        metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
299        let key = event_key(&event_id);
300        let root_key = RootKey::Event(event_id.chain_id).bytes();
301        self.put_key_value_bytes(root_key, key, value);
302        Ok(())
303    }
304
305    fn add_network_description(
306        &mut self,
307        information: &NetworkDescription,
308    ) -> Result<(), ViewError> {
309        #[cfg(with_metrics)]
310        metrics::WRITE_NETWORK_DESCRIPTION
311            .with_label_values(&[])
312            .inc();
313        let root_key = RootKey::NetworkDescription.bytes();
314        self.put_key_value(root_key, information)?;
315        Ok(())
316    }
317}
318
319/// Main implementation of the [`Storage`] trait.
320#[derive(Clone)]
321pub struct DbStorage<Database, Clock = WallClock> {
322    database: Arc<Database>,
323    clock: Clock,
324    wasm_runtime: Option<WasmRuntime>,
325    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
326    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
327    execution_runtime_config: ExecutionRuntimeConfig,
328}
329
330#[derive(Debug, Serialize, Deserialize)]
331enum RootKey {
332    ChainState(ChainId),
333    CryptoHash(CryptoHash),
334    Blob(BlobId),
335    Event(ChainId),
336    BlockExporterState(u32),
337    NetworkDescription,
338}
339
340impl RootKey {
341    fn bytes(&self) -> Vec<u8> {
342        bcs::to_bytes(self).unwrap()
343    }
344}
345
346fn event_key(event_id: &EventId) -> Vec<u8> {
347    let mut key = bcs::to_bytes(&event_id.stream_id).unwrap();
348    key.extend(bcs::to_bytes(&event_id.index).unwrap());
349    key
350}
351
352fn is_chain_state(root_key: &[u8]) -> bool {
353    if root_key.is_empty() {
354        return false;
355    }
356    root_key[0] == CHAIN_ID_TAG
357}
358
359const CHAIN_ID_TAG: u8 = 0;
360const BLOB_ID_TAG: u8 = 2;
361const CHAIN_ID_LENGTH: usize = std::mem::size_of::<ChainId>();
362const BLOB_ID_LENGTH: usize = std::mem::size_of::<BlobId>();
363
364#[cfg(test)]
365mod tests {
366    use linera_base::{
367        crypto::CryptoHash,
368        identifiers::{
369            ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
370            StreamName,
371        },
372    };
373
374    use crate::db_storage::{
375        event_key, RootKey, BLOB_ID_LENGTH, BLOB_ID_TAG, CHAIN_ID_LENGTH, CHAIN_ID_TAG,
376    };
377
378    // Several functionalities of the storage rely on the way that the serialization
379    // is done. Thus we need to check that the serialization works in the way that
380    // we expect.
381
382    // The listing of the blobs in `list_blob_ids` depends on the serialization
383    // of `RootKey::Blob`.
384    #[test]
385    fn test_root_key_blob_serialization() {
386        let hash = CryptoHash::default();
387        let blob_type = BlobType::default();
388        let blob_id = BlobId::new(hash, blob_type);
389        let root_key = RootKey::Blob(blob_id).bytes();
390        assert_eq!(root_key[0], BLOB_ID_TAG);
391        assert_eq!(root_key.len(), 1 + BLOB_ID_LENGTH);
392    }
393
394    // The listing of the chains in `list_chain_ids` depends on the serialization
395    // of `RootKey::ChainState`.
396    #[test]
397    fn test_root_key_chainstate_serialization() {
398        let hash = CryptoHash::default();
399        let chain_id = ChainId(hash);
400        let root_key = RootKey::ChainState(chain_id).bytes();
401        assert_eq!(root_key[0], CHAIN_ID_TAG);
402        assert_eq!(root_key.len(), 1 + CHAIN_ID_LENGTH);
403    }
404
405    // The listing of the events in `read_events_from_index` depends on the
406    // serialization of `BaseKey::Event`.
407    #[test]
408    fn test_root_key_event_serialization() {
409        let hash = CryptoHash::test_hash("49");
410        let chain_id = ChainId(hash);
411        let application_description_hash = CryptoHash::test_hash("42");
412        let application_id = ApplicationId::new(application_description_hash);
413        let application_id = GenericApplicationId::User(application_id);
414        let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
415        let stream_id = StreamId {
416            application_id,
417            stream_name,
418        };
419        let prefix = bcs::to_bytes(&stream_id).unwrap();
420
421        let index = 1567;
422        let event_id = EventId {
423            chain_id,
424            stream_id,
425            index,
426        };
427        let key = event_key(&event_id);
428        assert!(key.starts_with(&prefix));
429    }
430}
431
432/// An implementation of [`DualStoreRootKeyAssignment`] that stores the
433/// chain states into the first store.
434#[derive(Clone, Copy)]
435pub struct ChainStatesFirstAssignment;
436
437impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
438    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
439        if root_key.is_empty() {
440            return Ok(StoreInUse::Second);
441        }
442        let store = match is_chain_state(root_key) {
443            true => StoreInUse::First,
444            false => StoreInUse::Second,
445        };
446        Ok(store)
447    }
448}
449
450/// A `Clock` implementation using the system clock.
451#[derive(Clone)]
452pub struct WallClock;
453
454#[cfg_attr(not(web), async_trait)]
455#[cfg_attr(web, async_trait(?Send))]
456impl Clock for WallClock {
457    fn current_time(&self) -> Timestamp {
458        Timestamp::now()
459    }
460
461    async fn sleep(&self, delta: TimeDelta) {
462        linera_base::time::timer::sleep(delta.as_duration()).await
463    }
464
465    async fn sleep_until(&self, timestamp: Timestamp) {
466        let delta = timestamp.delta_since(Timestamp::now());
467        if delta > TimeDelta::ZERO {
468            self.sleep(delta).await
469        }
470    }
471}
472
473#[cfg(with_testing)]
474#[derive(Default)]
475struct TestClockInner {
476    time: Timestamp,
477    sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
478}
479
480#[cfg(with_testing)]
481impl TestClockInner {
482    fn set(&mut self, time: Timestamp) {
483        self.time = time;
484        let senders = self.sleeps.split_off(&Reverse(time));
485        for sender in senders.into_values().flatten() {
486            let _ = sender.send(());
487        }
488    }
489
490    fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
491        self.add_sleep_until(self.time.saturating_add(delta))
492    }
493
494    fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
495        let (sender, receiver) = oneshot::channel();
496        if self.time >= time {
497            let _ = sender.send(());
498        } else {
499            self.sleeps.entry(Reverse(time)).or_default().push(sender);
500        }
501        receiver
502    }
503}
504
505/// A clock implementation that uses a stored number of microseconds and that can be updated
506/// explicitly. All clones share the same time, and setting it in one clone updates all the others.
507#[cfg(with_testing)]
508#[derive(Clone, Default)]
509pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
510
511#[cfg(with_testing)]
512#[cfg_attr(not(web), async_trait)]
513#[cfg_attr(web, async_trait(?Send))]
514impl Clock for TestClock {
515    fn current_time(&self) -> Timestamp {
516        self.lock().time
517    }
518
519    async fn sleep(&self, delta: TimeDelta) {
520        if delta == TimeDelta::ZERO {
521            return;
522        }
523        let receiver = self.lock().add_sleep(delta);
524        let _ = receiver.await;
525    }
526
527    async fn sleep_until(&self, timestamp: Timestamp) {
528        let receiver = self.lock().add_sleep_until(timestamp);
529        let _ = receiver.await;
530    }
531}
532
533#[cfg(with_testing)]
534impl TestClock {
535    /// Creates a new clock with its time set to 0, i.e. the Unix epoch.
536    pub fn new() -> Self {
537        TestClock(Arc::default())
538    }
539
540    /// Sets the current time.
541    pub fn set(&self, time: Timestamp) {
542        self.lock().set(time);
543    }
544
545    /// Advances the current time by the specified delta.
546    pub fn add(&self, delta: TimeDelta) {
547        let mut guard = self.lock();
548        let time = guard.time.saturating_add(delta);
549        guard.set(time);
550    }
551
552    /// Returns the current time according to the test clock.
553    pub fn current_time(&self) -> Timestamp {
554        self.lock().time
555    }
556
557    fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
558        self.0.lock().expect("poisoned TestClock mutex")
559    }
560}
561
562#[cfg_attr(not(web), async_trait)]
563#[cfg_attr(web, async_trait(?Send))]
564impl<Database, C> Storage for DbStorage<Database, C>
565where
566    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
567    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
568    C: Clock + Clone + Send + Sync + 'static,
569    Database::Error: Send + Sync,
570{
571    type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
572    type Clock = C;
573    type BlockExporterContext = ViewContext<u32, Database::Store>;
574
575    fn clock(&self) -> &C {
576        &self.clock
577    }
578
579    #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
580    async fn load_chain(
581        &self,
582        chain_id: ChainId,
583    ) -> Result<ChainStateView<Self::Context>, ViewError> {
584        #[cfg(with_metrics)]
585        let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
586        let runtime_context = ChainRuntimeContext {
587            storage: self.clone(),
588            chain_id,
589            execution_runtime_config: self.execution_runtime_config,
590            user_contracts: self.user_contracts.clone(),
591            user_services: self.user_services.clone(),
592        };
593        let root_key = RootKey::ChainState(chain_id).bytes();
594        let store = self.database.open_exclusive(&root_key)?;
595        let context = ViewContext::create_root_context(store, runtime_context).await?;
596        ChainStateView::load(context).await
597    }
598
599    #[instrument(level = "trace", skip_all, fields(blob_id = %blob_id))]
600    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
601        let root_key = RootKey::Blob(blob_id).bytes();
602        let store = self.database.open_shared(&root_key)?;
603        let test = store.contains_key(DEFAULT_KEY).await?;
604        #[cfg(with_metrics)]
605        metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
606        Ok(test)
607    }
608
609    #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
610    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
611        let mut missing_blobs = Vec::new();
612        for blob_id in blob_ids {
613            let root_key = RootKey::Blob(*blob_id).bytes();
614            let store = self.database.open_shared(&root_key)?;
615            let test = store.contains_key(DEFAULT_KEY).await?;
616            if !test {
617                missing_blobs.push(*blob_id);
618            }
619        }
620        #[cfg(with_metrics)]
621        metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
622        Ok(missing_blobs)
623    }
624
625    #[instrument(skip_all, fields(blob_id = %blob_id))]
626    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
627        let root_key = RootKey::Blob(blob_id).bytes();
628        let store = self.database.open_shared(&root_key)?;
629        let test = store.contains_key(ONE_KEY).await?;
630        #[cfg(with_metrics)]
631        metrics::CONTAINS_BLOB_STATE_COUNTER
632            .with_label_values(&[])
633            .inc();
634        Ok(test)
635    }
636
637    #[instrument(skip_all, fields(hash = %hash))]
638    async fn read_confirmed_block(
639        &self,
640        hash: CryptoHash,
641    ) -> Result<Option<ConfirmedBlock>, ViewError> {
642        let root_key = RootKey::CryptoHash(hash).bytes();
643        let store = self.database.open_shared(&root_key)?;
644        let value = store.read_value(ONE_KEY).await?;
645        #[cfg(with_metrics)]
646        metrics::READ_CONFIRMED_BLOCK_COUNTER
647            .with_label_values(&[])
648            .inc();
649        Ok(value)
650    }
651
652    #[instrument(skip_all, fields(blob_id = %blob_id))]
653    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
654        let root_key = RootKey::Blob(blob_id).bytes();
655        let store = self.database.open_shared(&root_key)?;
656        let maybe_blob_bytes = store.read_value_bytes(DEFAULT_KEY).await?;
657        #[cfg(with_metrics)]
658        metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
659        Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
660    }
661
662    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
663    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
664        if blob_ids.is_empty() {
665            return Ok(Vec::new());
666        }
667        let mut blobs = Vec::new();
668        for blob_id in blob_ids {
669            blobs.push(self.read_blob(*blob_id).await?);
670        }
671        #[cfg(with_metrics)]
672        metrics::READ_BLOB_COUNTER
673            .with_label_values(&[])
674            .inc_by(blob_ids.len() as u64);
675        Ok(blobs)
676    }
677
678    #[instrument(skip_all, fields(blob_id = %blob_id))]
679    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
680        let root_key = RootKey::Blob(blob_id).bytes();
681        let store = self.database.open_shared(&root_key)?;
682        let blob_state = store.read_value::<BlobState>(ONE_KEY).await?;
683        #[cfg(with_metrics)]
684        metrics::READ_BLOB_STATE_COUNTER
685            .with_label_values(&[])
686            .inc();
687        Ok(blob_state)
688    }
689
690    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
691    async fn read_blob_states(
692        &self,
693        blob_ids: &[BlobId],
694    ) -> Result<Vec<Option<BlobState>>, ViewError> {
695        if blob_ids.is_empty() {
696            return Ok(Vec::new());
697        }
698        let mut blob_states = Vec::new();
699        for blob_id in blob_ids {
700            blob_states.push(self.read_blob_state(*blob_id).await?);
701        }
702        #[cfg(with_metrics)]
703        metrics::READ_BLOB_STATES_COUNTER
704            .with_label_values(&[])
705            .inc_by(blob_ids.len() as u64);
706        Ok(blob_states)
707    }
708
709    #[instrument(skip_all, fields(blob_id = %blob.id()))]
710    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
711        let mut batch = MultiPartitionBatch::new();
712        batch.add_blob(blob)?;
713        self.write_batch(batch).await?;
714        Ok(())
715    }
716
717    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
718    async fn maybe_write_blob_states(
719        &self,
720        blob_ids: &[BlobId],
721        blob_state: BlobState,
722    ) -> Result<(), ViewError> {
723        if blob_ids.is_empty() {
724            return Ok(());
725        }
726        let mut maybe_blob_states = Vec::new();
727        for blob_id in blob_ids {
728            let root_key = RootKey::Blob(*blob_id).bytes();
729            let store = self.database.open_shared(&root_key)?;
730            let maybe_blob_state = store.read_value::<BlobState>(ONE_KEY).await?;
731            maybe_blob_states.push(maybe_blob_state);
732        }
733        let mut batch = MultiPartitionBatch::new();
734        for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
735            match maybe_blob_state {
736                None => {
737                    batch.add_blob_state(*blob_id, &blob_state)?;
738                }
739                Some(state) => {
740                    if state.epoch < blob_state.epoch {
741                        batch.add_blob_state(*blob_id, &blob_state)?;
742                    }
743                }
744            }
745        }
746        // We tolerate race conditions because two active chains are likely to
747        // be both from the latest epoch, and otherwise failing to pick the
748        // more recent blob state has limited impact.
749        self.write_batch(batch).await?;
750        Ok(())
751    }
752
753    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
754    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
755        if blobs.is_empty() {
756            return Ok(Vec::new());
757        }
758        let mut batch = MultiPartitionBatch::new();
759        let mut blob_states = Vec::new();
760        for blob in blobs {
761            let root_key = RootKey::Blob(blob.id()).bytes();
762            let store = self.database.open_shared(&root_key)?;
763            let has_state = store.contains_key(ONE_KEY).await?;
764            blob_states.push(has_state);
765            if has_state {
766                batch.add_blob(blob)?;
767            }
768        }
769        self.write_batch(batch).await?;
770        Ok(blob_states)
771    }
772
773    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
774    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
775        if blobs.is_empty() {
776            return Ok(());
777        }
778        let mut batch = MultiPartitionBatch::new();
779        for blob in blobs {
780            batch.add_blob(blob)?;
781        }
782        self.write_batch(batch).await
783    }
784
785    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
786    async fn write_blobs_and_certificate(
787        &self,
788        blobs: &[Blob],
789        certificate: &ConfirmedBlockCertificate,
790    ) -> Result<(), ViewError> {
791        let mut batch = MultiPartitionBatch::new();
792        for blob in blobs {
793            batch.add_blob(blob)?;
794        }
795        batch.add_certificate(certificate)?;
796        self.write_batch(batch).await
797    }
798
799    #[instrument(skip_all, fields(hash = %hash))]
800    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
801        let root_key = RootKey::CryptoHash(hash).bytes();
802        let store = self.database.open_shared(&root_key)?;
803        let results = store.contains_keys(get_01_keys()).await?;
804        #[cfg(with_metrics)]
805        metrics::CONTAINS_CERTIFICATE_COUNTER
806            .with_label_values(&[])
807            .inc();
808        Ok(results[0] && results[1])
809    }
810
811    #[instrument(skip_all, fields(hash = %hash))]
812    async fn read_certificate(
813        &self,
814        hash: CryptoHash,
815    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
816        let root_key = RootKey::CryptoHash(hash).bytes();
817        let store = self.database.open_shared(&root_key)?;
818        let values = store.read_multi_values_bytes(get_01_keys()).await?;
819        #[cfg(with_metrics)]
820        metrics::READ_CERTIFICATE_COUNTER
821            .with_label_values(&[])
822            .inc();
823        Self::deserialize_certificate(&values, hash)
824    }
825
826    #[instrument(skip_all)]
827    async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
828        &self,
829        hashes: I,
830    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
831        let hashes = hashes.into_iter().collect::<Vec<_>>();
832        if hashes.is_empty() {
833            return Ok(Vec::new());
834        }
835        let root_keys = Self::get_root_keys_for_certificates(&hashes);
836        let mut values = Vec::new();
837        for root_key in root_keys {
838            let store = self.database.open_shared(&root_key)?;
839            values.extend(store.read_multi_values_bytes(get_01_keys()).await?);
840        }
841        #[cfg(with_metrics)]
842        metrics::READ_CERTIFICATES_COUNTER
843            .with_label_values(&[])
844            .inc_by(hashes.len() as u64);
845        let mut certificates = Vec::new();
846        for (pair, hash) in values.chunks_exact(2).zip(hashes) {
847            let certificate = Self::deserialize_certificate(pair, hash)?;
848            certificates.push(certificate);
849        }
850        Ok(certificates)
851    }
852
853    /// Reads certificates by hashes.
854    ///
855    /// Returns a vector of tuples where the first element is a lite certificate
856    /// and the second element is confirmed block.
857    ///
858    /// It does not check if all hashes all returned.
859    #[instrument(skip_all)]
860    async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
861        &self,
862        hashes: I,
863    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
864        let hashes = hashes.into_iter().collect::<Vec<_>>();
865        if hashes.is_empty() {
866            return Ok(Vec::new());
867        }
868        let root_keys = Self::get_root_keys_for_certificates(&hashes);
869        let mut values = Vec::new();
870        for root_key in root_keys {
871            let store = self.database.open_shared(&root_key)?;
872            values.extend(store.read_multi_values_bytes(get_01_keys()).await?);
873        }
874        #[cfg(with_metrics)]
875        metrics::READ_CERTIFICATES_COUNTER
876            .with_label_values(&[])
877            .inc_by(hashes.len() as u64);
878        Ok(values
879            .chunks_exact(2)
880            .filter_map(|chunk| {
881                let lite_cert_bytes = chunk[0].as_ref()?;
882                let confirmed_block_bytes = chunk[1].as_ref()?;
883                Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
884            })
885            .collect())
886    }
887
888    #[instrument(skip_all, fields(event_id = ?event_id))]
889    async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
890        let event_key = event_key(&event_id);
891        let root_key = RootKey::Event(event_id.chain_id).bytes();
892        let store = self.database.open_shared(&root_key)?;
893        let event = store.read_value_bytes(&event_key).await?;
894        #[cfg(with_metrics)]
895        metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
896        Ok(event)
897    }
898
899    #[instrument(skip_all, fields(event_id = ?event_id))]
900    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
901        let event_key = event_key(&event_id);
902        let root_key = RootKey::Event(event_id.chain_id).bytes();
903        let store = self.database.open_shared(&root_key)?;
904        let exists = store.contains_key(&event_key).await?;
905        #[cfg(with_metrics)]
906        metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
907        Ok(exists)
908    }
909
910    #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
911    async fn read_events_from_index(
912        &self,
913        chain_id: &ChainId,
914        stream_id: &StreamId,
915        start_index: u32,
916    ) -> Result<Vec<IndexAndEvent>, ViewError> {
917        let root_key = RootKey::Event(*chain_id).bytes();
918        let store = self.database.open_shared(&root_key)?;
919        let mut keys = Vec::new();
920        let mut indices = Vec::new();
921        let prefix = bcs::to_bytes(stream_id).unwrap();
922        for short_key in store.find_keys_by_prefix(&prefix).await? {
923            let index = bcs::from_bytes::<u32>(&short_key)?;
924            if index >= start_index {
925                let mut key = prefix.clone();
926                key.extend(short_key);
927                keys.push(key);
928                indices.push(index);
929            }
930        }
931        let values = store.read_multi_values_bytes(keys).await?;
932        let mut returned_values = Vec::new();
933        for (index, value) in indices.into_iter().zip(values) {
934            let event = value.unwrap();
935            returned_values.push(IndexAndEvent { index, event });
936        }
937        Ok(returned_values)
938    }
939
940    #[instrument(skip_all)]
941    async fn write_events(
942        &self,
943        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
944    ) -> Result<(), ViewError> {
945        let mut batch = MultiPartitionBatch::new();
946        for (event_id, value) in events {
947            batch.add_event(event_id, value)?;
948        }
949        self.write_batch(batch).await
950    }
951
952    #[instrument(skip_all)]
953    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
954        let root_key = RootKey::NetworkDescription.bytes();
955        let store = self.database.open_shared(&root_key)?;
956        let maybe_value = store.read_value(DEFAULT_KEY).await?;
957        #[cfg(with_metrics)]
958        metrics::READ_NETWORK_DESCRIPTION
959            .with_label_values(&[])
960            .inc();
961        Ok(maybe_value)
962    }
963
964    #[instrument(skip_all)]
965    async fn write_network_description(
966        &self,
967        information: &NetworkDescription,
968    ) -> Result<(), ViewError> {
969        let mut batch = MultiPartitionBatch::new();
970        batch.add_network_description(information)?;
971        self.write_batch(batch).await?;
972        Ok(())
973    }
974
975    fn wasm_runtime(&self) -> Option<WasmRuntime> {
976        self.wasm_runtime
977    }
978
979    #[instrument(skip_all)]
980    async fn block_exporter_context(
981        &self,
982        block_exporter_id: u32,
983    ) -> Result<Self::BlockExporterContext, ViewError> {
984        let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
985        let store = self.database.open_exclusive(&root_key)?;
986        Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
987    }
988}
989
990impl<Database, C> DbStorage<Database, C>
991where
992    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
993    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
994    C: Clock,
995    Database::Error: Send + Sync,
996{
997    #[instrument(skip_all)]
998    fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
999        hashes
1000            .iter()
1001            .map(|hash| RootKey::CryptoHash(*hash).bytes())
1002            .collect()
1003    }
1004
1005    #[instrument(skip_all)]
1006    fn deserialize_certificate(
1007        pair: &[Option<Vec<u8>>],
1008        hash: CryptoHash,
1009    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1010        let Some(cert_bytes) = pair[0].as_ref() else {
1011            return Ok(None);
1012        };
1013        let Some(value_bytes) = pair[1].as_ref() else {
1014            return Ok(None);
1015        };
1016        let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
1017        let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
1018        assert_eq!(value.hash(), hash);
1019        let certificate = cert
1020            .with_value(value)
1021            .ok_or(ViewError::InconsistentEntries)?;
1022        Ok(Some(certificate))
1023    }
1024
1025    #[instrument(skip_all)]
1026    async fn write_entry(
1027        store: &Database::Store,
1028        key: Vec<u8>,
1029        bytes: Vec<u8>,
1030    ) -> Result<(), ViewError> {
1031        let mut batch = Batch::new();
1032        batch.put_key_value_bytes(key, bytes);
1033        store.write_batch(batch).await?;
1034        Ok(())
1035    }
1036
1037    #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1038    async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1039        if batch.keys_value_bytes.is_empty() {
1040            return Ok(());
1041        }
1042        let mut futures = Vec::new();
1043        for (root_key, key, bytes) in batch.keys_value_bytes {
1044            let store = self.database.open_shared(&root_key)?;
1045            futures.push(async move { Self::write_entry(&store, key, bytes).await });
1046        }
1047        futures::future::try_join_all(futures).await?;
1048        Ok(())
1049    }
1050}
1051
1052impl<Database, C> DbStorage<Database, C> {
1053    fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1054        Self {
1055            database: Arc::new(database),
1056            clock,
1057            wasm_runtime,
1058            user_contracts: Arc::new(papaya::HashMap::new()),
1059            user_services: Arc::new(papaya::HashMap::new()),
1060            execution_runtime_config: ExecutionRuntimeConfig::default(),
1061        }
1062    }
1063}
1064
1065impl<Database> DbStorage<Database, WallClock>
1066where
1067    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1068    Database::Error: Send + Sync,
1069    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1070{
1071    pub async fn maybe_create_and_connect(
1072        config: &Database::Config,
1073        namespace: &str,
1074        wasm_runtime: Option<WasmRuntime>,
1075    ) -> Result<Self, Database::Error> {
1076        let database = Database::maybe_create_and_connect(config, namespace).await?;
1077        Ok(Self::new(database, wasm_runtime, WallClock))
1078    }
1079
1080    pub async fn connect(
1081        config: &Database::Config,
1082        namespace: &str,
1083        wasm_runtime: Option<WasmRuntime>,
1084    ) -> Result<Self, Database::Error> {
1085        let database = Database::connect(config, namespace).await?;
1086        Ok(Self::new(database, wasm_runtime, WallClock))
1087    }
1088
1089    /// Lists the blob IDs of the storage.
1090    pub async fn list_blob_ids(
1091        config: &Database::Config,
1092        namespace: &str,
1093    ) -> Result<Vec<BlobId>, ViewError> {
1094        let database = Database::connect(config, namespace).await?;
1095        let root_keys = database.list_root_keys().await?;
1096        let mut blob_ids = Vec::new();
1097        for root_key in root_keys {
1098            if root_key.len() == 1 + BLOB_ID_LENGTH && root_key[0] == BLOB_ID_TAG {
1099                let root_key_red = &root_key[1..=BLOB_ID_LENGTH];
1100                let blob_id = bcs::from_bytes(root_key_red)?;
1101                blob_ids.push(blob_id);
1102            }
1103        }
1104        Ok(blob_ids)
1105    }
1106}
1107
1108impl<Database> DbStorage<Database, WallClock>
1109where
1110    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1111    Database::Error: Send + Sync,
1112{
1113    /// Lists the chain IDs of the storage.
1114    pub async fn list_chain_ids(
1115        config: &Database::Config,
1116        namespace: &str,
1117    ) -> Result<Vec<ChainId>, ViewError> {
1118        let database = Database::connect(config, namespace).await?;
1119        let root_keys = database.list_root_keys().await?;
1120        let mut chain_ids = Vec::new();
1121        for root_key in root_keys {
1122            if root_key.len() == 1 + CHAIN_ID_LENGTH && root_key[0] == CHAIN_ID_TAG {
1123                let root_key_red = &root_key[1..=CHAIN_ID_LENGTH];
1124                let chain_id = bcs::from_bytes(root_key_red)?;
1125                chain_ids.push(chain_id);
1126            }
1127        }
1128        Ok(chain_ids)
1129    }
1130}
1131
1132#[cfg(with_testing)]
1133impl<Database> DbStorage<Database, TestClock>
1134where
1135    Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1136    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1137    Database::Error: Send + Sync,
1138{
1139    pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1140        let config = Database::new_test_config().await.unwrap();
1141        let namespace = generate_test_namespace();
1142        DbStorage::<Database, TestClock>::new_for_testing(
1143            config,
1144            &namespace,
1145            wasm_runtime,
1146            TestClock::new(),
1147        )
1148        .await
1149        .unwrap()
1150    }
1151
1152    pub async fn new_for_testing(
1153        config: Database::Config,
1154        namespace: &str,
1155        wasm_runtime: Option<WasmRuntime>,
1156        clock: TestClock,
1157    ) -> Result<Self, Database::Error> {
1158        let database = Database::recreate_and_connect(&config, namespace).await?;
1159        Ok(Self::new(database, wasm_runtime, clock))
1160    }
1161}