linera_storage/
db_storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use linera_base::{
10    crypto::CryptoHash,
11    data_types::{Blob, NetworkDescription, TimeDelta, Timestamp},
12    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
13};
14use linera_chain::{
15    types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
16    ChainStateView,
17};
18use linera_execution::{
19    BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
20};
21use linera_views::{
22    backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
23    context::ViewContext,
24    store::{
25        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
26    },
27    views::View,
28    ViewError,
29};
30use serde::{Deserialize, Serialize};
31#[cfg(with_testing)]
32use {
33    futures::channel::oneshot::{self, Receiver},
34    linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
35    std::{cmp::Reverse, collections::BTreeMap},
36};
37
38use crate::{ChainRuntimeContext, Clock, Storage};
39
40#[cfg(with_metrics)]
41pub mod metrics {
42    use std::sync::LazyLock;
43
44    use linera_base::prometheus_util::{
45        exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
46    };
47    use prometheus::{HistogramVec, IntCounterVec};
48
49    /// The metric counting how often a blob is tested for existence from storage
50    pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
51        register_int_counter_vec(
52            "contains_blob",
53            "The metric counting how often a blob is tested for existence from storage",
54            &[],
55        )
56    });
57
58    /// The metric counting how often multiple blobs are tested for existence from storage
59    pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
60        register_int_counter_vec(
61            "contains_blobs",
62            "The metric counting how often multiple blobs are tested for existence from storage",
63            &[],
64        )
65    });
66
67    /// The metric counting how often a blob state is tested for existence from storage
68    pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
69        register_int_counter_vec(
70            "contains_blob_state",
71            "The metric counting how often a blob state is tested for existence from storage",
72            &[],
73        )
74    });
75
76    /// The metric counting how often a certificate is tested for existence from storage.
77    pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
78        register_int_counter_vec(
79            "contains_certificate",
80            "The metric counting how often a certificate is tested for existence from storage",
81            &[],
82        )
83    });
84
85    /// The metric counting how often a hashed certificate value is read from storage.
86    #[doc(hidden)]
87    pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
88        register_int_counter_vec(
89            "read_confirmed_block",
90            "The metric counting how often a hashed confirmed block is read from storage",
91            &[],
92        )
93    });
94
95    /// The metric counting how often a blob is read from storage.
96    #[doc(hidden)]
97    pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
98        register_int_counter_vec(
99            "read_blob",
100            "The metric counting how often a blob is read from storage",
101            &[],
102        )
103    });
104
105    /// The metric counting how often a blob state is read from storage.
106    #[doc(hidden)]
107    pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
108        register_int_counter_vec(
109            "read_blob_state",
110            "The metric counting how often a blob state is read from storage",
111            &[],
112        )
113    });
114
115    /// The metric counting how often blob states are read from storage.
116    #[doc(hidden)]
117    pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
118        register_int_counter_vec(
119            "read_blob_states",
120            "The metric counting how often blob states are read from storage",
121            &[],
122        )
123    });
124
125    /// The metric counting how often a blob is written to storage.
126    #[doc(hidden)]
127    pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
128        register_int_counter_vec(
129            "write_blob",
130            "The metric counting how often a blob is written to storage",
131            &[],
132        )
133    });
134
135    /// The metric counting how often a certificate is read from storage.
136    #[doc(hidden)]
137    pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
138        register_int_counter_vec(
139            "read_certificate",
140            "The metric counting how often a certificate is read from storage",
141            &[],
142        )
143    });
144
145    /// The metric counting how often certificates are read from storage.
146    #[doc(hidden)]
147    pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
148        register_int_counter_vec(
149            "read_certificates",
150            "The metric counting how often certificate are read from storage",
151            &[],
152        )
153    });
154
155    /// The metric counting how often a certificate is written to storage.
156    #[doc(hidden)]
157    pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
158        register_int_counter_vec(
159            "write_certificate",
160            "The metric counting how often a certificate is written to storage",
161            &[],
162        )
163    });
164
165    /// The latency to load a chain state.
166    #[doc(hidden)]
167    pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
168        register_histogram_vec(
169            "load_chain_latency",
170            "The latency to load a chain state",
171            &[],
172            exponential_bucket_latencies(10.0),
173        )
174    });
175
176    /// The metric counting how often an event is read from storage.
177    #[doc(hidden)]
178    pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
179        register_int_counter_vec(
180            "read_event",
181            "The metric counting how often an event is read from storage",
182            &[],
183        )
184    });
185
186    /// The metric counting how often an event is tested for existence from storage
187    pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
188        register_int_counter_vec(
189            "contains_event",
190            "The metric counting how often an event is tested for existence from storage",
191            &[],
192        )
193    });
194
195    /// The metric counting how often an event is written to storage.
196    #[doc(hidden)]
197    pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
198        register_int_counter_vec(
199            "write_event",
200            "The metric counting how often an event is written to storage",
201            &[],
202        )
203    });
204
205    /// The metric counting how often the network description is read from storage.
206    #[doc(hidden)]
207    pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
208        register_int_counter_vec(
209            "network_description",
210            "The metric counting how often the network description is read from storage",
211            &[],
212        )
213    });
214
215    /// The metric counting how often the network description is written to storage.
216    #[doc(hidden)]
217    pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
218        register_int_counter_vec(
219            "write_network_description",
220            "The metric counting how often the network description is written to storage",
221            &[],
222        )
223    });
224}
225
226#[derive(Default)]
227struct Batch {
228    key_value_bytes: Vec<(Vec<u8>, Vec<u8>)>,
229}
230
231impl Batch {
232    fn new() -> Self {
233        Self::default()
234    }
235
236    fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
237        self.key_value_bytes.push((key, value));
238    }
239
240    fn put_key_value<T: Serialize>(&mut self, key: Vec<u8>, value: &T) -> Result<(), ViewError> {
241        let bytes = bcs::to_bytes(value)?;
242        self.key_value_bytes.push((key, bytes));
243        Ok(())
244    }
245
246    fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
247        #[cfg(with_metrics)]
248        metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
249        let blob_key = bcs::to_bytes(&BaseKey::Blob(blob.id()))?;
250        self.put_key_value_bytes(blob_key.to_vec(), blob.bytes().to_vec());
251        Ok(())
252    }
253
254    fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
255        let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
256        self.put_key_value(blob_state_key.to_vec(), blob_state)?;
257        Ok(())
258    }
259
260    fn add_certificate(
261        &mut self,
262        certificate: &ConfirmedBlockCertificate,
263    ) -> Result<(), ViewError> {
264        #[cfg(with_metrics)]
265        metrics::WRITE_CERTIFICATE_COUNTER
266            .with_label_values(&[])
267            .inc();
268        let hash = certificate.hash();
269        let cert_key = bcs::to_bytes(&BaseKey::Certificate(hash))?;
270        let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
271        self.put_key_value(cert_key.to_vec(), &certificate.lite_certificate())?;
272        self.put_key_value(block_key.to_vec(), certificate.value())?;
273        Ok(())
274    }
275
276    fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
277        #[cfg(with_metrics)]
278        metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
279        let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
280        self.put_key_value_bytes(event_key.to_vec(), value);
281        Ok(())
282    }
283
284    fn add_network_description(
285        &mut self,
286        information: &NetworkDescription,
287    ) -> Result<(), ViewError> {
288        #[cfg(with_metrics)]
289        metrics::WRITE_NETWORK_DESCRIPTION
290            .with_label_values(&[])
291            .inc();
292        let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
293        self.put_key_value(key, information)?;
294        Ok(())
295    }
296}
297
298/// Main implementation of the [`Storage`] trait.
299#[derive(Clone)]
300pub struct DbStorage<Database, Clock = WallClock> {
301    database: Arc<Database>,
302    clock: Clock,
303    wasm_runtime: Option<WasmRuntime>,
304    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
305    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
306    execution_runtime_config: ExecutionRuntimeConfig,
307}
308
309#[derive(Debug, Serialize, Deserialize)]
310enum BaseKey {
311    ChainState(ChainId),
312    Certificate(CryptoHash),
313    ConfirmedBlock(CryptoHash),
314    Blob(BlobId),
315    BlobState(BlobId),
316    Event(EventId),
317    BlockExporterState(u32),
318    NetworkDescription,
319}
320
321const INDEX_CHAIN_ID: u8 = 0;
322const INDEX_BLOB_ID: u8 = 3;
323const INDEX_EVENT_ID: u8 = 5;
324const CHAIN_ID_LENGTH: usize = std::mem::size_of::<ChainId>();
325const BLOB_ID_LENGTH: usize = std::mem::size_of::<BlobId>();
326
327#[cfg(test)]
328mod tests {
329    use linera_base::{
330        crypto::CryptoHash,
331        identifiers::{
332            ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
333            StreamName,
334        },
335    };
336
337    use crate::db_storage::{
338        BaseKey, BLOB_ID_LENGTH, CHAIN_ID_LENGTH, INDEX_BLOB_ID, INDEX_CHAIN_ID, INDEX_EVENT_ID,
339    };
340
341    // Several functionalities of the storage rely on the way that the serialization
342    // is done. Thus we need to check that the serialization works in the way that
343    // we expect.
344
345    // The listing of the blobs in `list_blob_ids` depends on the serialization
346    // of `BaseKey::Blob`.
347    #[test]
348    fn test_basekey_blob_serialization() {
349        let hash = CryptoHash::default();
350        let blob_type = BlobType::default();
351        let blob_id = BlobId::new(hash, blob_type);
352        let base_key = BaseKey::Blob(blob_id);
353        let key = bcs::to_bytes(&base_key).expect("a key");
354        assert_eq!(key[0], INDEX_BLOB_ID);
355        assert_eq!(key.len(), 1 + BLOB_ID_LENGTH);
356    }
357
358    // The listing of the chains in `list_chain_ids` depends on the serialization
359    // of `BaseKey::ChainState`.
360    #[test]
361    fn test_basekey_chainstate_serialization() {
362        let hash = CryptoHash::default();
363        let chain_id = ChainId(hash);
364        let base_key = BaseKey::ChainState(chain_id);
365        let key = bcs::to_bytes(&base_key).expect("a key");
366        assert_eq!(key[0], INDEX_CHAIN_ID);
367        assert_eq!(key.len(), 1 + CHAIN_ID_LENGTH);
368    }
369
370    // The listing of the events in `read_events_from_index` depends on the
371    // serialization of `BaseKey::Event`.
372    #[test]
373    fn test_basekey_event_serialization() {
374        let hash = CryptoHash::test_hash("49");
375        let chain_id = ChainId(hash);
376        let application_description_hash = CryptoHash::test_hash("42");
377        let application_id = ApplicationId::new(application_description_hash);
378        let application_id = GenericApplicationId::User(application_id);
379        let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
380        let stream_id = StreamId {
381            application_id,
382            stream_name,
383        };
384        let mut prefix = vec![INDEX_EVENT_ID];
385        prefix.extend(bcs::to_bytes(&chain_id).unwrap());
386        prefix.extend(bcs::to_bytes(&stream_id).unwrap());
387
388        let index = 1567;
389        let event_id = EventId {
390            chain_id,
391            stream_id,
392            index,
393        };
394        let base_key = BaseKey::Event(event_id);
395        let key = bcs::to_bytes(&base_key).unwrap();
396        assert!(key.starts_with(&prefix));
397    }
398}
399
400/// An implementation of [`DualStoreRootKeyAssignment`] that stores the
401/// chain states into the first store.
402#[derive(Clone, Copy)]
403pub struct ChainStatesFirstAssignment;
404
405impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
406    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
407        if root_key.is_empty() {
408            return Ok(StoreInUse::Second);
409        }
410        let store = match bcs::from_bytes(root_key)? {
411            BaseKey::ChainState(_) => StoreInUse::First,
412            _ => StoreInUse::Second,
413        };
414        Ok(store)
415    }
416}
417
418/// A `Clock` implementation using the system clock.
419#[derive(Clone)]
420pub struct WallClock;
421
422#[cfg_attr(not(web), async_trait)]
423#[cfg_attr(web, async_trait(?Send))]
424impl Clock for WallClock {
425    fn current_time(&self) -> Timestamp {
426        Timestamp::now()
427    }
428
429    async fn sleep(&self, delta: TimeDelta) {
430        linera_base::time::timer::sleep(delta.as_duration()).await
431    }
432
433    async fn sleep_until(&self, timestamp: Timestamp) {
434        let delta = timestamp.delta_since(Timestamp::now());
435        if delta > TimeDelta::ZERO {
436            self.sleep(delta).await
437        }
438    }
439}
440
441#[cfg(with_testing)]
442#[derive(Default)]
443struct TestClockInner {
444    time: Timestamp,
445    sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
446}
447
448#[cfg(with_testing)]
449impl TestClockInner {
450    fn set(&mut self, time: Timestamp) {
451        self.time = time;
452        let senders = self.sleeps.split_off(&Reverse(time));
453        for sender in senders.into_values().flatten() {
454            let _ = sender.send(());
455        }
456    }
457
458    fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
459        self.add_sleep_until(self.time.saturating_add(delta))
460    }
461
462    fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
463        let (sender, receiver) = oneshot::channel();
464        if self.time >= time {
465            let _ = sender.send(());
466        } else {
467            self.sleeps.entry(Reverse(time)).or_default().push(sender);
468        }
469        receiver
470    }
471}
472
473/// A clock implementation that uses a stored number of microseconds and that can be updated
474/// explicitly. All clones share the same time, and setting it in one clone updates all the others.
475#[cfg(with_testing)]
476#[derive(Clone, Default)]
477pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
478
479#[cfg(with_testing)]
480#[cfg_attr(not(web), async_trait)]
481#[cfg_attr(web, async_trait(?Send))]
482impl Clock for TestClock {
483    fn current_time(&self) -> Timestamp {
484        self.lock().time
485    }
486
487    async fn sleep(&self, delta: TimeDelta) {
488        if delta == TimeDelta::ZERO {
489            return;
490        }
491        let receiver = self.lock().add_sleep(delta);
492        let _ = receiver.await;
493    }
494
495    async fn sleep_until(&self, timestamp: Timestamp) {
496        let receiver = self.lock().add_sleep_until(timestamp);
497        let _ = receiver.await;
498    }
499}
500
501#[cfg(with_testing)]
502impl TestClock {
503    /// Creates a new clock with its time set to 0, i.e. the Unix epoch.
504    pub fn new() -> Self {
505        TestClock(Arc::default())
506    }
507
508    /// Sets the current time.
509    pub fn set(&self, time: Timestamp) {
510        self.lock().set(time);
511    }
512
513    /// Advances the current time by the specified delta.
514    pub fn add(&self, delta: TimeDelta) {
515        let mut guard = self.lock();
516        let time = guard.time.saturating_add(delta);
517        guard.set(time);
518    }
519
520    /// Returns the current time according to the test clock.
521    pub fn current_time(&self) -> Timestamp {
522        self.lock().time
523    }
524
525    fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
526        self.0.lock().expect("poisoned TestClock mutex")
527    }
528}
529
530#[cfg_attr(not(web), async_trait)]
531#[cfg_attr(web, async_trait(?Send))]
532impl<Database, C> Storage for DbStorage<Database, C>
533where
534    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
535    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
536    C: Clock + Clone + Send + Sync + 'static,
537    Database::Error: Send + Sync,
538{
539    type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
540    type Clock = C;
541    type BlockExporterContext = ViewContext<u32, Database::Store>;
542
543    fn clock(&self) -> &C {
544        &self.clock
545    }
546
547    async fn load_chain(
548        &self,
549        chain_id: ChainId,
550    ) -> Result<ChainStateView<Self::Context>, ViewError> {
551        #[cfg(with_metrics)]
552        let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
553        let runtime_context = ChainRuntimeContext {
554            storage: self.clone(),
555            chain_id,
556            execution_runtime_config: self.execution_runtime_config,
557            user_contracts: self.user_contracts.clone(),
558            user_services: self.user_services.clone(),
559        };
560        let root_key = bcs::to_bytes(&BaseKey::ChainState(chain_id))?;
561        let store = self.database.open_exclusive(&root_key)?;
562        let context = ViewContext::create_root_context(store, runtime_context).await?;
563        ChainStateView::load(context).await
564    }
565
566    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
567        let store = self.database.open_shared(&[])?;
568        let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
569        let test = store.contains_key(&blob_key).await?;
570        #[cfg(with_metrics)]
571        metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
572        Ok(test)
573    }
574
575    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
576        let store = self.database.open_shared(&[])?;
577        let mut keys = Vec::new();
578        for blob_id in blob_ids {
579            let key = bcs::to_bytes(&BaseKey::Blob(*blob_id))?;
580            keys.push(key);
581        }
582        let results = store.contains_keys(keys).await?;
583        let mut missing_blobs = Vec::new();
584        for (blob_id, result) in blob_ids.iter().zip(results) {
585            if !result {
586                missing_blobs.push(*blob_id);
587            }
588        }
589        #[cfg(with_metrics)]
590        metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
591        Ok(missing_blobs)
592    }
593
594    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
595        let store = self.database.open_shared(&[])?;
596        let blob_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
597        let test = store.contains_key(&blob_key).await?;
598        #[cfg(with_metrics)]
599        metrics::CONTAINS_BLOB_STATE_COUNTER
600            .with_label_values(&[])
601            .inc();
602        Ok(test)
603    }
604
605    async fn read_confirmed_block(
606        &self,
607        hash: CryptoHash,
608    ) -> Result<Option<ConfirmedBlock>, ViewError> {
609        let store = self.database.open_shared(&[])?;
610        let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
611        let value = store.read_value(&block_key).await?;
612        #[cfg(with_metrics)]
613        metrics::READ_CONFIRMED_BLOCK_COUNTER
614            .with_label_values(&[])
615            .inc();
616        Ok(value)
617    }
618
619    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
620        let store = self.database.open_shared(&[])?;
621        let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
622        let maybe_blob_bytes = store.read_value_bytes(&blob_key).await?;
623        #[cfg(with_metrics)]
624        metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
625        Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
626    }
627
628    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
629        if blob_ids.is_empty() {
630            return Ok(Vec::new());
631        }
632        let blob_keys = blob_ids
633            .iter()
634            .map(|blob_id| bcs::to_bytes(&BaseKey::Blob(*blob_id)))
635            .collect::<Result<Vec<_>, _>>()?;
636        let store = self.database.open_shared(&[])?;
637        let maybe_blob_bytes = store.read_multi_values_bytes(blob_keys).await?;
638        #[cfg(with_metrics)]
639        metrics::READ_BLOB_COUNTER
640            .with_label_values(&[])
641            .inc_by(blob_ids.len() as u64);
642
643        Ok(blob_ids
644            .iter()
645            .zip(maybe_blob_bytes)
646            .map(|(blob_id, maybe_blob_bytes)| {
647                maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(*blob_id, blob_bytes))
648            })
649            .collect())
650    }
651
652    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
653        let store = self.database.open_shared(&[])?;
654        let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
655        let blob_state = store.read_value::<BlobState>(&blob_state_key).await?;
656        #[cfg(with_metrics)]
657        metrics::READ_BLOB_STATE_COUNTER
658            .with_label_values(&[])
659            .inc();
660        Ok(blob_state)
661    }
662
663    async fn read_blob_states(
664        &self,
665        blob_ids: &[BlobId],
666    ) -> Result<Vec<Option<BlobState>>, ViewError> {
667        if blob_ids.is_empty() {
668            return Ok(Vec::new());
669        }
670        let blob_state_keys = blob_ids
671            .iter()
672            .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
673            .collect::<Result<_, _>>()?;
674        let store = self.database.open_shared(&[])?;
675        let blob_states = store
676            .read_multi_values::<BlobState>(blob_state_keys)
677            .await?;
678        #[cfg(with_metrics)]
679        metrics::READ_BLOB_STATES_COUNTER
680            .with_label_values(&[])
681            .inc_by(blob_ids.len() as u64);
682        Ok(blob_states)
683    }
684
685    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
686        let mut batch = Batch::new();
687        batch.add_blob(blob)?;
688        self.write_batch(batch).await?;
689        Ok(())
690    }
691
692    async fn maybe_write_blob_states(
693        &self,
694        blob_ids: &[BlobId],
695        blob_state: BlobState,
696    ) -> Result<(), ViewError> {
697        if blob_ids.is_empty() {
698            return Ok(());
699        }
700        let blob_state_keys = blob_ids
701            .iter()
702            .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
703            .collect::<Result<_, _>>()?;
704        let store = self.database.open_shared(&[])?;
705        let maybe_blob_states = store
706            .read_multi_values::<BlobState>(blob_state_keys)
707            .await?;
708        let mut batch = Batch::new();
709        for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
710            match maybe_blob_state {
711                None => {
712                    batch.add_blob_state(*blob_id, &blob_state)?;
713                }
714                Some(state) => {
715                    if state.epoch < blob_state.epoch {
716                        batch.add_blob_state(*blob_id, &blob_state)?;
717                    }
718                }
719            }
720        }
721        // We tolerate race conditions because two active chains are likely to
722        // be both from the latest epoch, and otherwise failing to pick the
723        // more recent blob state has limited impact.
724        self.write_batch(batch).await?;
725        Ok(())
726    }
727
728    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
729        if blobs.is_empty() {
730            return Ok(Vec::new());
731        }
732        let blob_state_keys = blobs
733            .iter()
734            .map(|blob| bcs::to_bytes(&BaseKey::BlobState(blob.id())))
735            .collect::<Result<_, _>>()?;
736        let store = self.database.open_shared(&[])?;
737        let blob_states = store.contains_keys(blob_state_keys).await?;
738        let mut batch = Batch::new();
739        for (blob, has_state) in blobs.iter().zip(&blob_states) {
740            if *has_state {
741                batch.add_blob(blob)?;
742            }
743        }
744        self.write_batch(batch).await?;
745        Ok(blob_states)
746    }
747
748    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
749        if blobs.is_empty() {
750            return Ok(());
751        }
752        let mut batch = Batch::new();
753        for blob in blobs {
754            batch.add_blob(blob)?;
755        }
756        self.write_batch(batch).await
757    }
758
759    async fn write_blobs_and_certificate(
760        &self,
761        blobs: &[Blob],
762        certificate: &ConfirmedBlockCertificate,
763    ) -> Result<(), ViewError> {
764        let mut batch = Batch::new();
765        for blob in blobs {
766            batch.add_blob(blob)?;
767        }
768        batch.add_certificate(certificate)?;
769        self.write_batch(batch).await
770    }
771
772    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
773        let keys = Self::get_keys_for_certificates(&[hash])?;
774        let store = self.database.open_shared(&[])?;
775        let results = store.contains_keys(keys).await?;
776        #[cfg(with_metrics)]
777        metrics::CONTAINS_CERTIFICATE_COUNTER
778            .with_label_values(&[])
779            .inc();
780        Ok(results[0] && results[1])
781    }
782
783    async fn read_certificate(
784        &self,
785        hash: CryptoHash,
786    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
787        let store = self.database.open_shared(&[])?;
788        let keys = Self::get_keys_for_certificates(&[hash])?;
789        let values = store.read_multi_values_bytes(keys).await;
790        if values.is_ok() {
791            #[cfg(with_metrics)]
792            metrics::READ_CERTIFICATE_COUNTER
793                .with_label_values(&[])
794                .inc();
795        }
796        let values = values?;
797        Self::deserialize_certificate(&values, hash)
798    }
799
800    async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
801        &self,
802        hashes: I,
803    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
804        let hashes = hashes.into_iter().collect::<Vec<_>>();
805        if hashes.is_empty() {
806            return Ok(Vec::new());
807        }
808        let keys = Self::get_keys_for_certificates(&hashes)?;
809        let store = self.database.open_shared(&[])?;
810        let values = store.read_multi_values_bytes(keys).await;
811        if values.is_ok() {
812            #[cfg(with_metrics)]
813            metrics::READ_CERTIFICATES_COUNTER
814                .with_label_values(&[])
815                .inc_by(hashes.len() as u64);
816        }
817        let values = values?;
818        let mut certificates = Vec::new();
819        for (pair, hash) in values.chunks_exact(2).zip(hashes) {
820            let certificate = Self::deserialize_certificate(pair, hash)?;
821            certificates.push(certificate);
822        }
823        Ok(certificates)
824    }
825
826    /// Reads certificates by hashes.
827    ///
828    /// Returns a vector of tuples where the first element is a lite certificate
829    /// and the second element is confirmed block.
830    ///
831    /// It does not check if all hashes all returned.
832    async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
833        &self,
834        hashes: I,
835    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
836        let hashes = hashes.into_iter().collect::<Vec<_>>();
837        if hashes.is_empty() {
838            return Ok(Vec::new());
839        }
840        let keys = Self::get_keys_for_certificates(&hashes)?;
841        let store = self.database.open_shared(&[])?;
842        let values = store.read_multi_values_bytes(keys).await?;
843        #[cfg(with_metrics)]
844        metrics::READ_CERTIFICATES_COUNTER
845            .with_label_values(&[])
846            .inc_by(hashes.len() as u64);
847        Ok(values
848            .chunks_exact(2)
849            .filter_map(|chunk| {
850                let lite_cert_bytes = chunk[0].as_ref()?;
851                let confirmed_block_bytes = chunk[1].as_ref()?;
852                Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
853            })
854            .collect())
855    }
856
857    async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
858        let store = self.database.open_shared(&[])?;
859        let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?;
860        let event = store.read_value_bytes(&event_key).await?;
861        #[cfg(with_metrics)]
862        metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
863        Ok(event)
864    }
865
866    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
867        let store = self.database.open_shared(&[])?;
868        let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
869        let exists = store.contains_key(&event_key).await?;
870        #[cfg(with_metrics)]
871        metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
872        Ok(exists)
873    }
874
875    async fn read_events_from_index(
876        &self,
877        chain_id: &ChainId,
878        stream_id: &StreamId,
879        start_index: u32,
880    ) -> Result<Vec<IndexAndEvent>, ViewError> {
881        let mut prefix = vec![INDEX_EVENT_ID];
882        prefix.extend(bcs::to_bytes(chain_id).unwrap());
883        prefix.extend(bcs::to_bytes(stream_id).unwrap());
884        let mut keys = Vec::new();
885        let mut indices = Vec::new();
886        let store = self.database.open_shared(&[])?;
887        for short_key in store.find_keys_by_prefix(&prefix).await? {
888            let index = bcs::from_bytes::<u32>(&short_key)?;
889            if index >= start_index {
890                let mut key = prefix.clone();
891                key.extend(short_key);
892                keys.push(key);
893                indices.push(index);
894            }
895        }
896        let values = store.read_multi_values_bytes(keys).await?;
897        let mut returned_values = Vec::new();
898        for (index, value) in indices.into_iter().zip(values) {
899            let event = value.unwrap();
900            returned_values.push(IndexAndEvent { index, event });
901        }
902        Ok(returned_values)
903    }
904
905    async fn write_events(
906        &self,
907        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
908    ) -> Result<(), ViewError> {
909        let mut batch = Batch::new();
910        for (event_id, value) in events {
911            batch.add_event(event_id, value)?;
912        }
913        self.write_batch(batch).await
914    }
915
916    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
917        let store = self.database.open_shared(&[])?;
918        let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
919        let maybe_value = store.read_value(&key).await?;
920        #[cfg(with_metrics)]
921        metrics::READ_NETWORK_DESCRIPTION
922            .with_label_values(&[])
923            .inc();
924        Ok(maybe_value)
925    }
926
927    async fn write_network_description(
928        &self,
929        information: &NetworkDescription,
930    ) -> Result<(), ViewError> {
931        let mut batch = Batch::new();
932        batch.add_network_description(information)?;
933        self.write_batch(batch).await?;
934        Ok(())
935    }
936
937    fn wasm_runtime(&self) -> Option<WasmRuntime> {
938        self.wasm_runtime
939    }
940
941    async fn block_exporter_context(
942        &self,
943        block_exporter_id: u32,
944    ) -> Result<Self::BlockExporterContext, ViewError> {
945        let root_key = bcs::to_bytes(&BaseKey::BlockExporterState(block_exporter_id))?;
946        let store = self.database.open_exclusive(&root_key)?;
947        Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
948    }
949}
950
951impl<Database, C> DbStorage<Database, C>
952where
953    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
954    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
955    C: Clock,
956    Database::Error: Send + Sync,
957{
958    fn get_keys_for_certificates(hashes: &[CryptoHash]) -> Result<Vec<Vec<u8>>, ViewError> {
959        Ok(hashes
960            .iter()
961            .flat_map(|hash| {
962                let cert_key = bcs::to_bytes(&BaseKey::Certificate(*hash));
963                let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(*hash));
964                vec![cert_key, block_key]
965            })
966            .collect::<Result<_, _>>()?)
967    }
968
969    fn deserialize_certificate(
970        pair: &[Option<Vec<u8>>],
971        hash: CryptoHash,
972    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
973        let Some(cert_bytes) = pair[0].as_ref() else {
974            return Ok(None);
975        };
976        let Some(value_bytes) = pair[1].as_ref() else {
977            return Ok(None);
978        };
979        let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
980        let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
981        assert_eq!(value.hash(), hash);
982        let certificate = cert
983            .with_value(value)
984            .ok_or(ViewError::InconsistentEntries)?;
985        Ok(Some(certificate))
986    }
987
988    async fn write_entry(
989        store: &Database::Store,
990        key: Vec<u8>,
991        bytes: Vec<u8>,
992    ) -> Result<(), ViewError> {
993        let mut batch = linera_views::batch::Batch::new();
994        batch.put_key_value_bytes(key, bytes);
995        store.write_batch(batch).await?;
996        Ok(())
997    }
998
999    async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
1000        if batch.key_value_bytes.is_empty() {
1001            return Ok(());
1002        }
1003        let mut futures = Vec::new();
1004        for (key, bytes) in batch.key_value_bytes {
1005            let store = self.database.open_shared(&[])?;
1006            futures.push(async move { Self::write_entry(&store, key, bytes).await });
1007        }
1008        futures::future::try_join_all(futures).await?;
1009        Ok(())
1010    }
1011}
1012
1013impl<Database, C> DbStorage<Database, C> {
1014    fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1015        Self {
1016            database: Arc::new(database),
1017            clock,
1018            wasm_runtime,
1019            user_contracts: Arc::new(papaya::HashMap::new()),
1020            user_services: Arc::new(papaya::HashMap::new()),
1021            execution_runtime_config: ExecutionRuntimeConfig::default(),
1022        }
1023    }
1024}
1025
1026impl<Database> DbStorage<Database, WallClock>
1027where
1028    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1029    Database::Error: Send + Sync,
1030    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1031{
1032    pub async fn maybe_create_and_connect(
1033        config: &Database::Config,
1034        namespace: &str,
1035        wasm_runtime: Option<WasmRuntime>,
1036    ) -> Result<Self, Database::Error> {
1037        let database = Database::maybe_create_and_connect(config, namespace).await?;
1038        Ok(Self::new(database, wasm_runtime, WallClock))
1039    }
1040
1041    pub async fn connect(
1042        config: &Database::Config,
1043        namespace: &str,
1044        wasm_runtime: Option<WasmRuntime>,
1045    ) -> Result<Self, Database::Error> {
1046        let database = Database::connect(config, namespace).await?;
1047        Ok(Self::new(database, wasm_runtime, WallClock))
1048    }
1049
1050    /// Lists the blob IDs of the storage.
1051    pub async fn list_blob_ids(
1052        config: &Database::Config,
1053        namespace: &str,
1054    ) -> Result<Vec<BlobId>, ViewError> {
1055        let database = Database::maybe_create_and_connect(config, namespace).await?;
1056        let store = database.open_shared(&[])?;
1057        let prefix = &[INDEX_BLOB_ID];
1058        let keys = store.find_keys_by_prefix(prefix).await?;
1059        let mut blob_ids = Vec::new();
1060        for key in keys {
1061            let key_red = &key[..BLOB_ID_LENGTH];
1062            let blob_id = bcs::from_bytes(key_red)?;
1063            blob_ids.push(blob_id);
1064        }
1065        Ok(blob_ids)
1066    }
1067}
1068
1069impl<Database> DbStorage<Database, WallClock>
1070where
1071    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1072    Database::Error: Send + Sync,
1073{
1074    /// Lists the chain IDs of the storage.
1075    pub async fn list_chain_ids(
1076        config: &Database::Config,
1077        namespace: &str,
1078    ) -> Result<Vec<ChainId>, ViewError> {
1079        let root_keys = Database::list_root_keys(config, namespace).await?;
1080        let mut chain_ids = Vec::new();
1081        for root_key in root_keys {
1082            if root_key.len() == 1 + CHAIN_ID_LENGTH && root_key[0] == INDEX_CHAIN_ID {
1083                let root_key_red = &root_key[1..=CHAIN_ID_LENGTH];
1084                let chain_id = bcs::from_bytes(root_key_red)?;
1085                chain_ids.push(chain_id);
1086            }
1087        }
1088        Ok(chain_ids)
1089    }
1090}
1091
1092#[cfg(with_testing)]
1093impl<Database> DbStorage<Database, TestClock>
1094where
1095    Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1096    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1097    Database::Error: Send + Sync,
1098{
1099    pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1100        let config = Database::new_test_config().await.unwrap();
1101        let namespace = generate_test_namespace();
1102        DbStorage::<Database, TestClock>::new_for_testing(
1103            config,
1104            &namespace,
1105            wasm_runtime,
1106            TestClock::new(),
1107        )
1108        .await
1109        .unwrap()
1110    }
1111
1112    pub async fn new_for_testing(
1113        config: Database::Config,
1114        namespace: &str,
1115        wasm_runtime: Option<WasmRuntime>,
1116        clock: TestClock,
1117    ) -> Result<Self, Database::Error> {
1118        let database = Database::recreate_and_connect(&config, namespace).await?;
1119        Ok(Self::new(database, wasm_runtime, clock))
1120    }
1121}