linera_storage/
db_storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7use dashmap::DashMap;
8#[cfg(with_metrics)]
9use linera_base::prometheus_util::MeasureLatency as _;
10use linera_base::{
11    crypto::CryptoHash,
12    data_types::{Blob, NetworkDescription, TimeDelta, Timestamp},
13    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
14};
15use linera_chain::{
16    types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
17    ChainStateView,
18};
19use linera_execution::{
20    BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
21};
22use linera_views::{
23    backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
24    context::ViewContext,
25    store::{AdminKeyValueStore, KeyIterable as _, KeyValueStore},
26    views::View,
27    ViewError,
28};
29use serde::{Deserialize, Serialize};
30#[cfg(with_testing)]
31use {
32    futures::channel::oneshot::{self, Receiver},
33    linera_views::{random::generate_test_namespace, store::TestKeyValueStore},
34    std::{cmp::Reverse, collections::BTreeMap},
35};
36
37use crate::{ChainRuntimeContext, Clock, Storage};
38
39#[cfg(with_metrics)]
40pub mod metrics {
41    use std::sync::LazyLock;
42
43    use linera_base::prometheus_util::{
44        exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
45    };
46    use prometheus::{HistogramVec, IntCounterVec};
47
48    /// The metric counting how often a blob is tested for existence from storage
49    pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
50        register_int_counter_vec(
51            "contains_blob",
52            "The metric counting how often a blob is tested for existence from storage",
53            &[],
54        )
55    });
56
57    /// The metric counting how often multiple blobs are tested for existence from storage
58    pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
59        register_int_counter_vec(
60            "contains_blobs",
61            "The metric counting how often multiple blobs are tested for existence from storage",
62            &[],
63        )
64    });
65
66    /// The metric counting how often a blob state is tested for existence from storage
67    pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
68        register_int_counter_vec(
69            "contains_blob_state",
70            "The metric counting how often a blob state is tested for existence from storage",
71            &[],
72        )
73    });
74
75    /// The metric counting how often a certificate is tested for existence from storage.
76    pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
77        register_int_counter_vec(
78            "contains_certificate",
79            "The metric counting how often a certificate is tested for existence from storage",
80            &[],
81        )
82    });
83
84    /// The metric counting how often a hashed certificate value is read from storage.
85    #[doc(hidden)]
86    pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
87        register_int_counter_vec(
88            "read_confirmed_block",
89            "The metric counting how often a hashed confirmed block is read from storage",
90            &[],
91        )
92    });
93
94    /// The metric counting how often a blob is read from storage.
95    #[doc(hidden)]
96    pub static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
97        register_int_counter_vec(
98            "read_blob",
99            "The metric counting how often a blob is read from storage",
100            &[],
101        )
102    });
103
104    /// The metric counting how often a blob state is read from storage.
105    #[doc(hidden)]
106    pub static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
107        register_int_counter_vec(
108            "read_blob_state",
109            "The metric counting how often a blob state is read from storage",
110            &[],
111        )
112    });
113
114    /// The metric counting how often blob states are read from storage.
115    #[doc(hidden)]
116    pub static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
117        register_int_counter_vec(
118            "read_blob_states",
119            "The metric counting how often blob states are read from storage",
120            &[],
121        )
122    });
123
124    /// The metric counting how often a blob is written to storage.
125    #[doc(hidden)]
126    pub static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
127        register_int_counter_vec(
128            "write_blob",
129            "The metric counting how often a blob is written to storage",
130            &[],
131        )
132    });
133
134    /// The metric counting how often a certificate is read from storage.
135    #[doc(hidden)]
136    pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
137        register_int_counter_vec(
138            "read_certificate",
139            "The metric counting how often a certificate is read from storage",
140            &[],
141        )
142    });
143
144    /// The metric counting how often certificates are read from storage.
145    #[doc(hidden)]
146    pub static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
147        register_int_counter_vec(
148            "read_certificates",
149            "The metric counting how often certificate are read from storage",
150            &[],
151        )
152    });
153
154    /// The metric counting how often a certificate is written to storage.
155    #[doc(hidden)]
156    pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
157        register_int_counter_vec(
158            "write_certificate",
159            "The metric counting how often a certificate is written to storage",
160            &[],
161        )
162    });
163
164    /// The latency to load a chain state.
165    #[doc(hidden)]
166    pub static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
167        register_histogram_vec(
168            "load_chain_latency",
169            "The latency to load a chain state",
170            &[],
171            exponential_bucket_latencies(10.0),
172        )
173    });
174
175    /// The metric counting how often an event is read from storage.
176    #[doc(hidden)]
177    pub static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
178        register_int_counter_vec(
179            "read_event",
180            "The metric counting how often an event is read from storage",
181            &[],
182        )
183    });
184
185    /// The metric counting how often an event is tested for existence from storage
186    pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
187        register_int_counter_vec(
188            "contains_event",
189            "The metric counting how often an event is tested for existence from storage",
190            &[],
191        )
192    });
193
194    /// The metric counting how often an event is written to storage.
195    #[doc(hidden)]
196    pub static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
197        register_int_counter_vec(
198            "write_event",
199            "The metric counting how often an event is written to storage",
200            &[],
201        )
202    });
203
204    /// The metric counting how often the network description is read from storage.
205    #[doc(hidden)]
206    pub static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
207        register_int_counter_vec(
208            "network_description",
209            "The metric counting how often the network description is read from storage",
210            &[],
211        )
212    });
213
214    /// The metric counting how often the network description is written to storage.
215    #[doc(hidden)]
216    pub static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
217        register_int_counter_vec(
218            "write_network_description",
219            "The metric counting how often the network description is written to storage",
220            &[],
221        )
222    });
223}
224
225#[derive(Default)]
226struct Batch {
227    key_value_bytes: Vec<(Vec<u8>, Vec<u8>)>,
228}
229
230impl Batch {
231    fn new() -> Self {
232        Self::default()
233    }
234
235    fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
236        self.key_value_bytes.push((key, value));
237    }
238
239    fn put_key_value<T: Serialize>(&mut self, key: Vec<u8>, value: &T) -> Result<(), ViewError> {
240        let bytes = bcs::to_bytes(value)?;
241        self.key_value_bytes.push((key, bytes));
242        Ok(())
243    }
244
245    fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
246        #[cfg(with_metrics)]
247        metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
248        let blob_key = bcs::to_bytes(&BaseKey::Blob(blob.id()))?;
249        self.put_key_value_bytes(blob_key.to_vec(), blob.bytes().to_vec());
250        Ok(())
251    }
252
253    fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
254        let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
255        self.put_key_value(blob_state_key.to_vec(), blob_state)?;
256        Ok(())
257    }
258
259    fn add_certificate(
260        &mut self,
261        certificate: &ConfirmedBlockCertificate,
262    ) -> Result<(), ViewError> {
263        #[cfg(with_metrics)]
264        metrics::WRITE_CERTIFICATE_COUNTER
265            .with_label_values(&[])
266            .inc();
267        let hash = certificate.hash();
268        let cert_key = bcs::to_bytes(&BaseKey::Certificate(hash))?;
269        let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
270        self.put_key_value(cert_key.to_vec(), &certificate.lite_certificate())?;
271        self.put_key_value(block_key.to_vec(), certificate.value())?;
272        Ok(())
273    }
274
275    fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
276        #[cfg(with_metrics)]
277        metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
278        let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
279        self.put_key_value_bytes(event_key.to_vec(), value);
280        Ok(())
281    }
282
283    fn add_network_description(
284        &mut self,
285        information: &NetworkDescription,
286    ) -> Result<(), ViewError> {
287        #[cfg(with_metrics)]
288        metrics::WRITE_NETWORK_DESCRIPTION
289            .with_label_values(&[])
290            .inc();
291        let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
292        self.put_key_value(key, information)?;
293        Ok(())
294    }
295}
296
297/// Main implementation of the [`Storage`] trait.
298#[derive(Clone)]
299pub struct DbStorage<Store, Clock = WallClock> {
300    store: Arc<Store>,
301    clock: Clock,
302    wasm_runtime: Option<WasmRuntime>,
303    user_contracts: Arc<DashMap<ApplicationId, UserContractCode>>,
304    user_services: Arc<DashMap<ApplicationId, UserServiceCode>>,
305    execution_runtime_config: ExecutionRuntimeConfig,
306}
307
308#[derive(Debug, Serialize, Deserialize)]
309enum BaseKey {
310    ChainState(ChainId),
311    Certificate(CryptoHash),
312    ConfirmedBlock(CryptoHash),
313    Blob(BlobId),
314    BlobState(BlobId),
315    Event(EventId),
316    BlockExporterState(u32),
317    NetworkDescription,
318}
319
320const INDEX_CHAIN_ID: u8 = 0;
321const INDEX_BLOB_ID: u8 = 3;
322const INDEX_EVENT_ID: u8 = 5;
323const CHAIN_ID_LENGTH: usize = std::mem::size_of::<ChainId>();
324const BLOB_ID_LENGTH: usize = std::mem::size_of::<BlobId>();
325
326#[cfg(test)]
327mod tests {
328    use linera_base::{
329        crypto::CryptoHash,
330        identifiers::{
331            ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
332            StreamName,
333        },
334    };
335
336    use crate::db_storage::{
337        BaseKey, BLOB_ID_LENGTH, CHAIN_ID_LENGTH, INDEX_BLOB_ID, INDEX_CHAIN_ID, INDEX_EVENT_ID,
338    };
339
340    // Several functionalities of the storage rely on the way that the serialization
341    // is done. Thus we need to check that the serialization works in the way that
342    // we expect.
343
344    // The listing of the blobs in `list_blob_ids` depends on the serialization
345    // of `BaseKey::Blob`.
346    #[test]
347    fn test_basekey_blob_serialization() {
348        let hash = CryptoHash::default();
349        let blob_type = BlobType::default();
350        let blob_id = BlobId::new(hash, blob_type);
351        let base_key = BaseKey::Blob(blob_id);
352        let key = bcs::to_bytes(&base_key).expect("a key");
353        assert_eq!(key[0], INDEX_BLOB_ID);
354        assert_eq!(key.len(), 1 + BLOB_ID_LENGTH);
355    }
356
357    // The listing of the chains in `list_chain_ids` depends on the serialization
358    // of `BaseKey::ChainState`.
359    #[test]
360    fn test_basekey_chainstate_serialization() {
361        let hash = CryptoHash::default();
362        let chain_id = ChainId(hash);
363        let base_key = BaseKey::ChainState(chain_id);
364        let key = bcs::to_bytes(&base_key).expect("a key");
365        assert_eq!(key[0], INDEX_CHAIN_ID);
366        assert_eq!(key.len(), 1 + CHAIN_ID_LENGTH);
367    }
368
369    // The listing of the events in `read_events_from_index` depends on the
370    // serialization of `BaseKey::Event`.
371    #[test]
372    fn test_basekey_event_serialization() {
373        let hash = CryptoHash::test_hash("49");
374        let chain_id = ChainId(hash);
375        let application_description_hash = CryptoHash::test_hash("42");
376        let application_id = ApplicationId::new(application_description_hash);
377        let application_id = GenericApplicationId::User(application_id);
378        let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
379        let stream_id = StreamId {
380            application_id,
381            stream_name,
382        };
383        let mut prefix = vec![INDEX_EVENT_ID];
384        prefix.extend(bcs::to_bytes(&chain_id).unwrap());
385        prefix.extend(bcs::to_bytes(&stream_id).unwrap());
386
387        let index = 1567;
388        let event_id = EventId {
389            chain_id,
390            stream_id,
391            index,
392        };
393        let base_key = BaseKey::Event(event_id);
394        let key = bcs::to_bytes(&base_key).unwrap();
395        assert!(key.starts_with(&prefix));
396    }
397}
398
399/// An implementation of [`DualStoreRootKeyAssignment`] that stores the
400/// chain states into the first store.
401#[derive(Clone, Copy)]
402pub struct ChainStatesFirstAssignment;
403
404impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
405    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
406        if root_key.is_empty() {
407            return Ok(StoreInUse::Second);
408        }
409        let store = match bcs::from_bytes(root_key)? {
410            BaseKey::ChainState(_) => StoreInUse::First,
411            _ => StoreInUse::Second,
412        };
413        Ok(store)
414    }
415}
416
417/// A `Clock` implementation using the system clock.
418#[derive(Clone)]
419pub struct WallClock;
420
421#[cfg_attr(not(web), async_trait)]
422#[cfg_attr(web, async_trait(?Send))]
423impl Clock for WallClock {
424    fn current_time(&self) -> Timestamp {
425        Timestamp::now()
426    }
427
428    async fn sleep(&self, delta: TimeDelta) {
429        linera_base::time::timer::sleep(delta.as_duration()).await
430    }
431
432    async fn sleep_until(&self, timestamp: Timestamp) {
433        let delta = timestamp.delta_since(Timestamp::now());
434        if delta > TimeDelta::ZERO {
435            self.sleep(delta).await
436        }
437    }
438}
439
440#[cfg(with_testing)]
441#[derive(Default)]
442struct TestClockInner {
443    time: Timestamp,
444    sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
445}
446
447#[cfg(with_testing)]
448impl TestClockInner {
449    fn set(&mut self, time: Timestamp) {
450        self.time = time;
451        let senders = self.sleeps.split_off(&Reverse(time));
452        for sender in senders.into_values().flatten() {
453            let _ = sender.send(());
454        }
455    }
456
457    fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
458        self.add_sleep_until(self.time.saturating_add(delta))
459    }
460
461    fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
462        let (sender, receiver) = oneshot::channel();
463        if self.time >= time {
464            let _ = sender.send(());
465        } else {
466            self.sleeps.entry(Reverse(time)).or_default().push(sender);
467        }
468        receiver
469    }
470}
471
472/// A clock implementation that uses a stored number of microseconds and that can be updated
473/// explicitly. All clones share the same time, and setting it in one clone updates all the others.
474#[cfg(with_testing)]
475#[derive(Clone, Default)]
476pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
477
478#[cfg(with_testing)]
479#[cfg_attr(not(web), async_trait)]
480#[cfg_attr(web, async_trait(?Send))]
481impl Clock for TestClock {
482    fn current_time(&self) -> Timestamp {
483        self.lock().time
484    }
485
486    async fn sleep(&self, delta: TimeDelta) {
487        if delta == TimeDelta::ZERO {
488            return;
489        }
490        let receiver = self.lock().add_sleep(delta);
491        let _ = receiver.await;
492    }
493
494    async fn sleep_until(&self, timestamp: Timestamp) {
495        let receiver = self.lock().add_sleep_until(timestamp);
496        let _ = receiver.await;
497    }
498}
499
500#[cfg(with_testing)]
501impl TestClock {
502    /// Creates a new clock with its time set to 0, i.e. the Unix epoch.
503    pub fn new() -> Self {
504        TestClock(Arc::default())
505    }
506
507    /// Sets the current time.
508    pub fn set(&self, time: Timestamp) {
509        self.lock().set(time);
510    }
511
512    /// Advances the current time by the specified delta.
513    pub fn add(&self, delta: TimeDelta) {
514        let mut guard = self.lock();
515        let time = guard.time.saturating_add(delta);
516        guard.set(time);
517    }
518
519    /// Returns the current time according to the test clock.
520    pub fn current_time(&self) -> Timestamp {
521        self.lock().time
522    }
523
524    fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
525        self.0.lock().expect("poisoned TestClock mutex")
526    }
527}
528
529#[cfg_attr(not(web), async_trait)]
530#[cfg_attr(web, async_trait(?Send))]
531impl<Store, C> Storage for DbStorage<Store, C>
532where
533    Store: KeyValueStore + Clone + Send + Sync + 'static,
534    C: Clock + Clone + Send + Sync + 'static,
535    Store::Error: Send + Sync,
536{
537    type Context = ViewContext<ChainRuntimeContext<Self>, Store>;
538    type Clock = C;
539    type BlockExporterContext = ViewContext<u32, Store>;
540
541    fn clock(&self) -> &C {
542        &self.clock
543    }
544
545    async fn load_chain(
546        &self,
547        chain_id: ChainId,
548    ) -> Result<ChainStateView<Self::Context>, ViewError> {
549        #[cfg(with_metrics)]
550        let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
551        let runtime_context = ChainRuntimeContext {
552            storage: self.clone(),
553            chain_id,
554            execution_runtime_config: self.execution_runtime_config,
555            user_contracts: self.user_contracts.clone(),
556            user_services: self.user_services.clone(),
557        };
558        let root_key = bcs::to_bytes(&BaseKey::ChainState(chain_id))?;
559        let store = self.store.open_exclusive(&root_key)?;
560        let context = ViewContext::create_root_context(store, runtime_context).await?;
561        ChainStateView::load(context).await
562    }
563
564    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
565        let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
566        let test = self.store.contains_key(&blob_key).await?;
567        #[cfg(with_metrics)]
568        metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
569        Ok(test)
570    }
571
572    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
573        let mut keys = Vec::new();
574        for blob_id in blob_ids {
575            let key = bcs::to_bytes(&BaseKey::Blob(*blob_id))?;
576            keys.push(key);
577        }
578        let results = self.store.contains_keys(keys).await?;
579        let mut missing_blobs = Vec::new();
580        for (blob_id, result) in blob_ids.iter().zip(results) {
581            if !result {
582                missing_blobs.push(*blob_id);
583            }
584        }
585        #[cfg(with_metrics)]
586        metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
587        Ok(missing_blobs)
588    }
589
590    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
591        let blob_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
592        let test = self.store.contains_key(&blob_key).await?;
593        #[cfg(with_metrics)]
594        metrics::CONTAINS_BLOB_STATE_COUNTER
595            .with_label_values(&[])
596            .inc();
597        Ok(test)
598    }
599
600    async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, ViewError> {
601        let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
602        let maybe_value = self.store.read_value::<ConfirmedBlock>(&block_key).await?;
603        #[cfg(with_metrics)]
604        metrics::READ_CONFIRMED_BLOCK_COUNTER
605            .with_label_values(&[])
606            .inc();
607        let value = maybe_value.ok_or_else(|| ViewError::not_found("value for hash", hash))?;
608        Ok(value)
609    }
610
611    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
612        let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
613        let maybe_blob_bytes = self.store.read_value_bytes(&blob_key).await?;
614        #[cfg(with_metrics)]
615        metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
616        Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
617    }
618
619    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
620        if blob_ids.is_empty() {
621            return Ok(Vec::new());
622        }
623        let blob_keys = blob_ids
624            .iter()
625            .map(|blob_id| bcs::to_bytes(&BaseKey::Blob(*blob_id)))
626            .collect::<Result<Vec<_>, _>>()?;
627        let maybe_blob_bytes = self.store.read_multi_values_bytes(blob_keys).await?;
628        #[cfg(with_metrics)]
629        metrics::READ_BLOB_COUNTER
630            .with_label_values(&[])
631            .inc_by(blob_ids.len() as u64);
632
633        Ok(blob_ids
634            .iter()
635            .zip(maybe_blob_bytes)
636            .map(|(blob_id, maybe_blob_bytes)| {
637                maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(*blob_id, blob_bytes))
638            })
639            .collect())
640    }
641
642    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
643        let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
644        let blob_state = self.store.read_value::<BlobState>(&blob_state_key).await?;
645        #[cfg(with_metrics)]
646        metrics::READ_BLOB_STATE_COUNTER
647            .with_label_values(&[])
648            .inc();
649        Ok(blob_state)
650    }
651
652    async fn read_blob_states(
653        &self,
654        blob_ids: &[BlobId],
655    ) -> Result<Vec<Option<BlobState>>, ViewError> {
656        if blob_ids.is_empty() {
657            return Ok(Vec::new());
658        }
659        let blob_state_keys = blob_ids
660            .iter()
661            .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
662            .collect::<Result<_, _>>()?;
663        let blob_states = self
664            .store
665            .read_multi_values::<BlobState>(blob_state_keys)
666            .await?;
667        #[cfg(with_metrics)]
668        metrics::READ_BLOB_STATES_COUNTER
669            .with_label_values(&[])
670            .inc_by(blob_ids.len() as u64);
671        Ok(blob_states)
672    }
673
674    async fn read_confirmed_blocks_downward(
675        &self,
676        from: CryptoHash,
677        limit: u32,
678    ) -> Result<Vec<ConfirmedBlock>, ViewError> {
679        let mut hash = Some(from);
680        let mut values = Vec::new();
681        for _ in 0..limit {
682            let Some(next_hash) = hash else {
683                break;
684            };
685            let value = self.read_confirmed_block(next_hash).await?;
686            hash = value.block().header.previous_block_hash;
687            values.push(value);
688        }
689        Ok(values)
690    }
691
692    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
693        let mut batch = Batch::new();
694        batch.add_blob(blob)?;
695        self.write_batch(batch).await?;
696        Ok(())
697    }
698
699    async fn maybe_write_blob_states(
700        &self,
701        blob_ids: &[BlobId],
702        blob_state: BlobState,
703    ) -> Result<(), ViewError> {
704        if blob_ids.is_empty() {
705            return Ok(());
706        }
707        let blob_state_keys = blob_ids
708            .iter()
709            .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
710            .collect::<Result<_, _>>()?;
711        let maybe_blob_states = self
712            .store
713            .read_multi_values::<BlobState>(blob_state_keys)
714            .await?;
715        let mut batch = Batch::new();
716        for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
717            match maybe_blob_state {
718                None => {
719                    batch.add_blob_state(*blob_id, &blob_state)?;
720                }
721                Some(state) => {
722                    if state.epoch < blob_state.epoch {
723                        batch.add_blob_state(*blob_id, &blob_state)?;
724                    }
725                }
726            }
727        }
728        // We tolerate race conditions because two active chains are likely to
729        // be both from the latest epoch, and otherwise failing to pick the
730        // more recent blob state has limited impact.
731        self.write_batch(batch).await?;
732        Ok(())
733    }
734
735    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
736        if blobs.is_empty() {
737            return Ok(Vec::new());
738        }
739        let blob_state_keys = blobs
740            .iter()
741            .map(|blob| bcs::to_bytes(&BaseKey::BlobState(blob.id())))
742            .collect::<Result<_, _>>()?;
743        let blob_states = self.store.contains_keys(blob_state_keys).await?;
744        let mut batch = Batch::new();
745        for (blob, has_state) in blobs.iter().zip(&blob_states) {
746            if *has_state {
747                batch.add_blob(blob)?;
748            }
749        }
750        self.write_batch(batch).await?;
751        Ok(blob_states)
752    }
753
754    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
755        if blobs.is_empty() {
756            return Ok(());
757        }
758        let mut batch = Batch::new();
759        for blob in blobs {
760            batch.add_blob(blob)?;
761        }
762        self.write_batch(batch).await
763    }
764
765    async fn write_blobs_and_certificate(
766        &self,
767        blobs: &[Blob],
768        certificate: &ConfirmedBlockCertificate,
769    ) -> Result<(), ViewError> {
770        let mut batch = Batch::new();
771        for blob in blobs {
772            batch.add_blob(blob)?;
773        }
774        batch.add_certificate(certificate)?;
775        self.write_batch(batch).await
776    }
777
778    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
779        let keys = Self::get_keys_for_certificates(&[hash])?;
780        let results = self.store.contains_keys(keys).await?;
781        #[cfg(with_metrics)]
782        metrics::CONTAINS_CERTIFICATE_COUNTER
783            .with_label_values(&[])
784            .inc();
785        Ok(results[0] && results[1])
786    }
787
788    async fn read_certificate(
789        &self,
790        hash: CryptoHash,
791    ) -> Result<ConfirmedBlockCertificate, ViewError> {
792        let keys = Self::get_keys_for_certificates(&[hash])?;
793        let values = self.store.read_multi_values_bytes(keys).await;
794        if values.is_ok() {
795            #[cfg(with_metrics)]
796            metrics::READ_CERTIFICATE_COUNTER
797                .with_label_values(&[])
798                .inc();
799        }
800        let values = values?;
801        Self::deserialize_certificate(&values, hash)
802    }
803
804    async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
805        &self,
806        hashes: I,
807    ) -> Result<Vec<ConfirmedBlockCertificate>, ViewError> {
808        let hashes = hashes.into_iter().collect::<Vec<_>>();
809        if hashes.is_empty() {
810            return Ok(Vec::new());
811        }
812        let keys = Self::get_keys_for_certificates(&hashes)?;
813        let values = self.store.read_multi_values_bytes(keys).await;
814        if values.is_ok() {
815            #[cfg(with_metrics)]
816            metrics::READ_CERTIFICATES_COUNTER
817                .with_label_values(&[])
818                .inc_by(hashes.len() as u64);
819        }
820        let values = values?;
821        let mut certificates = Vec::new();
822        for (pair, hash) in values.chunks_exact(2).zip(hashes) {
823            let certificate = Self::deserialize_certificate(pair, hash)?;
824            certificates.push(certificate);
825        }
826        Ok(certificates)
827    }
828
829    async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
830        let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?;
831        let event = self.store.read_value_bytes(&event_key).await?;
832        #[cfg(with_metrics)]
833        metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
834        Ok(event)
835    }
836
837    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
838        let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
839        let exists = self.store.contains_key(&event_key).await?;
840        #[cfg(with_metrics)]
841        metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
842        Ok(exists)
843    }
844
845    async fn read_events_from_index(
846        &self,
847        chain_id: &ChainId,
848        stream_id: &StreamId,
849        start_index: u32,
850    ) -> Result<Vec<IndexAndEvent>, ViewError> {
851        let mut prefix = vec![INDEX_EVENT_ID];
852        prefix.extend(bcs::to_bytes(chain_id).unwrap());
853        prefix.extend(bcs::to_bytes(stream_id).unwrap());
854        let mut keys = Vec::new();
855        let mut indices = Vec::new();
856        for short_key in self.store.find_keys_by_prefix(&prefix).await?.iterator() {
857            let short_key = short_key?;
858            let index = bcs::from_bytes::<u32>(short_key)?;
859            if index >= start_index {
860                let mut key = prefix.clone();
861                key.extend(short_key);
862                keys.push(key);
863                indices.push(index);
864            }
865        }
866        let values = self.store.read_multi_values_bytes(keys).await?;
867        let mut returned_values = Vec::new();
868        for (index, value) in indices.into_iter().zip(values) {
869            let event = value.unwrap();
870            returned_values.push(IndexAndEvent { index, event });
871        }
872        Ok(returned_values)
873    }
874
875    async fn write_events(
876        &self,
877        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
878    ) -> Result<(), ViewError> {
879        let mut batch = Batch::new();
880        for (event_id, value) in events {
881            batch.add_event(event_id, value)?;
882        }
883        self.write_batch(batch).await
884    }
885
886    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
887        let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
888        let maybe_value = self.store.read_value(&key).await?;
889        #[cfg(with_metrics)]
890        metrics::READ_NETWORK_DESCRIPTION
891            .with_label_values(&[])
892            .inc();
893        Ok(maybe_value)
894    }
895
896    async fn write_network_description(
897        &self,
898        information: &NetworkDescription,
899    ) -> Result<(), ViewError> {
900        let mut batch = Batch::new();
901        batch.add_network_description(information)?;
902        self.write_batch(batch).await?;
903        Ok(())
904    }
905
906    fn wasm_runtime(&self) -> Option<WasmRuntime> {
907        self.wasm_runtime
908    }
909
910    async fn block_exporter_context(
911        &self,
912        block_exporter_id: u32,
913    ) -> Result<Self::BlockExporterContext, ViewError> {
914        let root_key = bcs::to_bytes(&BaseKey::BlockExporterState(block_exporter_id))?;
915        let store = self.store.open_exclusive(&root_key)?;
916        Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
917    }
918}
919
920impl<Store, C> DbStorage<Store, C>
921where
922    Store: KeyValueStore + Clone + Send + Sync + 'static,
923    C: Clock,
924    Store::Error: Send + Sync,
925{
926    fn get_keys_for_certificates(hashes: &[CryptoHash]) -> Result<Vec<Vec<u8>>, ViewError> {
927        Ok(hashes
928            .iter()
929            .flat_map(|hash| {
930                let cert_key = bcs::to_bytes(&BaseKey::Certificate(*hash));
931                let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(*hash));
932                vec![cert_key, block_key]
933            })
934            .collect::<Result<_, _>>()?)
935    }
936
937    fn deserialize_certificate(
938        pair: &[Option<Vec<u8>>],
939        hash: CryptoHash,
940    ) -> Result<ConfirmedBlockCertificate, ViewError> {
941        let cert_bytes = pair[0]
942            .as_ref()
943            .ok_or_else(|| ViewError::not_found("certificate bytes for hash", hash))?;
944        let value_bytes = pair[1]
945            .as_ref()
946            .ok_or_else(|| ViewError::not_found("value bytes for hash", hash))?;
947        let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
948        let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
949        assert_eq!(value.hash(), hash);
950        let certificate = cert
951            .with_value(value)
952            .ok_or(ViewError::InconsistentEntries)?;
953        Ok(certificate)
954    }
955
956    async fn write_entry(store: &Store, key: Vec<u8>, bytes: Vec<u8>) -> Result<(), ViewError> {
957        let mut batch = linera_views::batch::Batch::new();
958        batch.put_key_value_bytes(key, bytes);
959        store.write_batch(batch).await?;
960        Ok(())
961    }
962
963    async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
964        if batch.key_value_bytes.is_empty() {
965            return Ok(());
966        }
967        let mut futures = Vec::new();
968        for (key, bytes) in batch.key_value_bytes.into_iter() {
969            let store = self.store.clone();
970            futures.push(async move { Self::write_entry(&store, key, bytes).await });
971        }
972        futures::future::try_join_all(futures).await?;
973        Ok(())
974    }
975
976    fn new(store: Store, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
977        Self {
978            store: Arc::new(store),
979            clock,
980            wasm_runtime,
981            user_contracts: Arc::new(DashMap::new()),
982            user_services: Arc::new(DashMap::new()),
983            execution_runtime_config: ExecutionRuntimeConfig::default(),
984        }
985    }
986}
987
988impl<Store> DbStorage<Store, WallClock>
989where
990    Store: KeyValueStore + Clone + Send + Sync + 'static,
991    Store::Error: Send + Sync,
992{
993    pub async fn maybe_create_and_connect(
994        config: &Store::Config,
995        namespace: &str,
996        wasm_runtime: Option<WasmRuntime>,
997    ) -> Result<Self, Store::Error> {
998        let store = Store::maybe_create_and_connect(config, namespace).await?;
999        Ok(Self::new(store, wasm_runtime, WallClock))
1000    }
1001
1002    pub async fn connect(
1003        config: &Store::Config,
1004        namespace: &str,
1005        wasm_runtime: Option<WasmRuntime>,
1006    ) -> Result<Self, Store::Error> {
1007        let store = Store::connect(config, namespace).await?;
1008        Ok(Self::new(store, wasm_runtime, WallClock))
1009    }
1010
1011    /// Lists the blob IDs of the storage.
1012    pub async fn list_blob_ids(
1013        config: &Store::Config,
1014        namespace: &str,
1015    ) -> Result<Vec<BlobId>, ViewError> {
1016        let store = Store::maybe_create_and_connect(config, namespace).await?;
1017        let prefix = &[INDEX_BLOB_ID];
1018        let keys = store.find_keys_by_prefix(prefix).await?;
1019        let mut blob_ids = Vec::new();
1020        for key in keys.iterator() {
1021            let key = key?;
1022            let key_red = &key[..BLOB_ID_LENGTH];
1023            let blob_id = bcs::from_bytes(key_red)?;
1024            blob_ids.push(blob_id);
1025        }
1026        Ok(blob_ids)
1027    }
1028}
1029
1030impl<Store> DbStorage<Store, WallClock>
1031where
1032    Store: AdminKeyValueStore + Clone + Send + Sync + 'static,
1033    Store::Error: Send + Sync,
1034{
1035    /// Lists the chain IDs of the storage.
1036    pub async fn list_chain_ids(
1037        config: &Store::Config,
1038        namespace: &str,
1039    ) -> Result<Vec<ChainId>, ViewError> {
1040        let root_keys = Store::list_root_keys(config, namespace).await?;
1041        let mut chain_ids = Vec::new();
1042        for root_key in root_keys {
1043            if root_key.len() == 1 + CHAIN_ID_LENGTH && root_key[0] == INDEX_CHAIN_ID {
1044                let root_key_red = &root_key[1..1 + CHAIN_ID_LENGTH];
1045                let chain_id = bcs::from_bytes(root_key_red)?;
1046                chain_ids.push(chain_id);
1047            }
1048        }
1049        Ok(chain_ids)
1050    }
1051}
1052
1053#[cfg(with_testing)]
1054impl<Store> DbStorage<Store, TestClock>
1055where
1056    Store: TestKeyValueStore + Clone + Send + Sync + 'static,
1057    Store::Error: Send + Sync,
1058{
1059    pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1060        let config = Store::new_test_config().await.unwrap();
1061        let namespace = generate_test_namespace();
1062        DbStorage::<Store, TestClock>::new_for_testing(
1063            config,
1064            &namespace,
1065            wasm_runtime,
1066            TestClock::new(),
1067        )
1068        .await
1069        .unwrap()
1070    }
1071
1072    pub async fn new_for_testing(
1073        config: Store::Config,
1074        namespace: &str,
1075        wasm_runtime: Option<WasmRuntime>,
1076        clock: TestClock,
1077    ) -> Result<Self, Store::Error> {
1078        let store = Store::recreate_and_connect(&config, namespace).await?;
1079        Ok(Self::new(store, wasm_runtime, clock))
1080    }
1081}