linera_storage/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the storage abstractions for individual chains and certificates.
5
6mod db_storage;
7
8use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc};
9
10use async_trait::async_trait;
11use itertools::Itertools;
12use linera_base::{
13    crypto::CryptoHash,
14    data_types::{
15        ApplicationDescription, Blob, ChainDescription, CompressedBytecode, Epoch,
16        NetworkDescription, TimeDelta, Timestamp,
17    },
18    identifiers::{ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent, StreamId},
19    vm::VmRuntime,
20};
21use linera_chain::{
22    types::{ConfirmedBlock, ConfirmedBlockCertificate},
23    ChainError, ChainStateView,
24};
25use linera_execution::{
26    committee::Committee, system::EPOCH_STREAM_NAME, BlobState, ExecutionError,
27    ExecutionRuntimeConfig, ExecutionRuntimeContext, TransactionTracker, UserContractCode,
28    UserServiceCode, WasmRuntime,
29};
30#[cfg(with_revm)]
31use linera_execution::{
32    evm::revm::{EvmContractModule, EvmServiceModule},
33    EvmRuntime,
34};
35#[cfg(with_wasm_runtime)]
36use linera_execution::{WasmContractModule, WasmServiceModule};
37use linera_views::{context::Context, views::RootView, ViewError};
38
39#[cfg(with_metrics)]
40pub use crate::db_storage::metrics;
41#[cfg(with_testing)]
42pub use crate::db_storage::TestClock;
43pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
44
45/// The default namespace to be used when none is specified
46pub const DEFAULT_NAMESPACE: &str = "table_linera";
47
48/// Communicate with a persistent storage using the "views" abstraction.
49#[cfg_attr(not(web), async_trait)]
50#[cfg_attr(web, async_trait(?Send))]
51pub trait Storage: Sized {
52    /// The low-level storage implementation in use by the core protocol (chain workers etc).
53    type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + Send + Sync + 'static;
54
55    /// The clock type being used.
56    type Clock: Clock;
57
58    /// The low-level storage implementation in use by the block exporter.
59    type BlockExporterContext: Context<Extra = u32> + Clone + Send + Sync + 'static;
60
61    /// Returns the current wall clock time.
62    fn clock(&self) -> &Self::Clock;
63
64    /// Loads the view of a chain state.
65    ///
66    /// # Notes
67    ///
68    /// Each time this method is called, a new [`ChainStateView`] is created. If there are multiple
69    /// instances of the same chain active at any given moment, they will race to access persistent
70    /// storage. This can lead to invalid states and data corruption.
71    async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
72
73    /// Tests the existence of a blob with the given blob ID.
74    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
75
76    /// Returns what blobs from the input are missing from storage.
77    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
78
79    /// Tests existence of a blob state with the given blob ID.
80    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
81
82    /// Reads the hashed certificate value with the given hash.
83    async fn read_confirmed_block(
84        &self,
85        hash: CryptoHash,
86    ) -> Result<Option<ConfirmedBlock>, ViewError>;
87
88    /// Reads the blob with the given blob ID.
89    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
90
91    /// Reads the blobs with the given blob IDs.
92    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
93
94    /// Reads the blob state with the given blob ID.
95    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
96
97    /// Reads the blob states with the given blob IDs.
98    async fn read_blob_states(
99        &self,
100        blob_ids: &[BlobId],
101    ) -> Result<Vec<Option<BlobState>>, ViewError>;
102
103    /// Writes the given blob.
104    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
105
106    /// Writes blobs and certificate
107    async fn write_blobs_and_certificate(
108        &self,
109        blobs: &[Blob],
110        certificate: &ConfirmedBlockCertificate,
111    ) -> Result<(), ViewError>;
112
113    /// Writes the given blobs, but only if they already have a blob state. Returns `true` for the
114    /// blobs that were written.
115    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
116
117    /// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
118    async fn maybe_write_blob_states(
119        &self,
120        blob_ids: &[BlobId],
121        blob_state: BlobState,
122    ) -> Result<(), ViewError>;
123
124    /// Writes several blobs.
125    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
126
127    /// Tests existence of the certificate with the given hash.
128    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
129
130    /// Reads the certificate with the given hash.
131    async fn read_certificate(
132        &self,
133        hash: CryptoHash,
134    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
135
136    /// Reads a number of certificates
137    async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
138        &self,
139        hashes: I,
140    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
141
142    /// Reads certificates by hashes.
143    ///
144    /// Returns a vector of tuples where the first element is a lite certificate
145    /// and the second element is confirmed block.
146    ///
147    /// It does not check if all hashes all returned.
148    async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
149        &self,
150        hashes: I,
151    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError>;
152
153    /// Reads the event with the given ID.
154    async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
155
156    /// Tests existence of the event with the given ID.
157    async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
158
159    /// Lists all the events from a starting index
160    async fn read_events_from_index(
161        &self,
162        chain_id: &ChainId,
163        stream_id: &StreamId,
164        start_index: u32,
165    ) -> Result<Vec<IndexAndEvent>, ViewError>;
166
167    /// Writes a vector of events.
168    async fn write_events(
169        &self,
170        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
171    ) -> Result<(), ViewError>;
172
173    /// Reads the network description.
174    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
175
176    /// Writes the network description.
177    async fn write_network_description(
178        &self,
179        information: &NetworkDescription,
180    ) -> Result<(), ViewError>;
181
182    /// Returns a map of the committees for the given epochs.
183    async fn committees_for(
184        &self,
185        epoch_range: RangeInclusive<Epoch>,
186    ) -> Result<BTreeMap<Epoch, Committee>, ViewError> {
187        // Short-circuit for an empty input range.
188        if epoch_range.is_empty() {
189            return Ok(BTreeMap::new());
190        }
191        let min_epoch = epoch_range.start();
192        let max_epoch = epoch_range.end();
193        let read_committee = async |committee_hash| -> Result<Committee, ViewError> {
194            let blob_id = BlobId::new(committee_hash, BlobType::Committee);
195            let committee_blob = self
196                .read_blob(blob_id)
197                .await?
198                .ok_or_else(|| ViewError::NotFound(format!("blob {}", blob_id)))?;
199            Ok(bcs::from_bytes(committee_blob.bytes())?)
200        };
201
202        let network_description = self
203            .read_network_description()
204            .await?
205            .ok_or_else(|| ViewError::NotFound("NetworkDescription not found".to_owned()))?;
206        let admin_chain_id = network_description.admin_chain_id;
207        let mut result = BTreeMap::new();
208        // special case: the genesis epoch is stored in the NetworkDescription
209        if *min_epoch == Epoch::ZERO {
210            let genesis_committee =
211                read_committee(network_description.genesis_committee_blob_hash).await?;
212            result.insert(Epoch::ZERO, genesis_committee);
213        }
214
215        let start_index = min_epoch.0.max(1);
216        let epoch_creation_events = self
217            .read_events_from_index(
218                &admin_chain_id,
219                &StreamId::system(EPOCH_STREAM_NAME),
220                start_index,
221            )
222            .await?;
223
224        result.extend(
225            futures::future::try_join_all(
226                epoch_creation_events
227                    .into_iter()
228                    .take_while(|index_and_event| index_and_event.index <= max_epoch.0)
229                    .map(|index_and_event| async move {
230                        let epoch = Epoch::from(index_and_event.index);
231                        let maybe_blob_hash = bcs::from_bytes::<CryptoHash>(&index_and_event.event);
232                        let committee = read_committee(maybe_blob_hash?).await?;
233                        Result::<_, ViewError>::Ok((epoch, committee))
234                    }),
235            )
236            .await?,
237        );
238
239        Ok(result)
240    }
241
242    /// Initializes a chain in a simple way (used for testing and to create a genesis state).
243    ///
244    /// # Notes
245    ///
246    /// This method creates a new [`ChainStateView`] instance. If there are multiple instances of
247    /// the same chain active at any given moment, they will race to access persistent storage.
248    /// This can lead to invalid states and data corruption.
249    async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
250    where
251        ChainRuntimeContext<Self>: ExecutionRuntimeContext,
252    {
253        let id = description.id();
254        // Store the description blob.
255        self.write_blob(&Blob::new_chain_description(&description))
256            .await?;
257        let mut chain = self.load_chain(id).await?;
258        assert!(!chain.is_active(), "Attempting to create a chain twice");
259        let current_time = self.clock().current_time();
260        chain.ensure_is_active(current_time).await?;
261        chain.save().await?;
262        Ok(())
263    }
264
265    /// Selects the WebAssembly runtime to use for applications (if any).
266    fn wasm_runtime(&self) -> Option<WasmRuntime>;
267
268    /// Creates a [`UserContractCode`] instance using the bytecode in storage referenced
269    /// by the `application_description`.
270    async fn load_contract(
271        &self,
272        application_description: &ApplicationDescription,
273        txn_tracker: &TransactionTracker,
274    ) -> Result<UserContractCode, ExecutionError> {
275        let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
276        let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
277            Some(content) => content.clone(),
278            None => self
279                .read_blob(contract_bytecode_blob_id)
280                .await?
281                .ok_or(ExecutionError::BlobsNotFound(vec![
282                    contract_bytecode_blob_id,
283                ]))?
284                .into_content(),
285        };
286        let compressed_contract_bytecode = CompressedBytecode {
287            compressed_bytes: content.into_arc_bytes(),
288        };
289        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
290        let contract_bytecode =
291            linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
292                move |_| async move { compressed_contract_bytecode.decompress() },
293            )
294            .await
295            .join()
296            .await?;
297        match application_description.module_id.vm_runtime {
298            VmRuntime::Wasm => {
299                cfg_if::cfg_if! {
300                    if #[cfg(with_wasm_runtime)] {
301                        let Some(wasm_runtime) = self.wasm_runtime() else {
302                            panic!("A Wasm runtime is required to load user applications.");
303                        };
304                        Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
305                           .await?
306                           .into())
307                    } else {
308                        panic!(
309                            "A Wasm runtime is required to load user applications. \
310                             Please enable the `wasmer` or the `wasmtime` feature flags \
311                             when compiling `linera-storage`."
312                        );
313                    }
314                }
315            }
316            VmRuntime::Evm => {
317                cfg_if::cfg_if! {
318                    if #[cfg(with_revm)] {
319                        let evm_runtime = EvmRuntime::Revm;
320                        Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
321                           .into())
322                    } else {
323                        panic!(
324                            "An Evm runtime is required to load user applications. \
325                             Please enable the `revm` feature flag \
326                             when compiling `linera-storage`."
327                        );
328                    }
329                }
330            }
331        }
332    }
333
334    /// Creates a [`linera-sdk::UserContract`] instance using the bytecode in storage referenced
335    /// by the `application_description`.
336    async fn load_service(
337        &self,
338        application_description: &ApplicationDescription,
339        txn_tracker: &TransactionTracker,
340    ) -> Result<UserServiceCode, ExecutionError> {
341        let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
342        let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
343            Some(content) => content.clone(),
344            None => self
345                .read_blob(service_bytecode_blob_id)
346                .await?
347                .ok_or(ExecutionError::BlobsNotFound(vec![
348                    service_bytecode_blob_id,
349                ]))?
350                .into_content(),
351        };
352        let compressed_service_bytecode = CompressedBytecode {
353            compressed_bytes: content.into_arc_bytes(),
354        };
355        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
356        let service_bytecode = linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
357            move |_| async move { compressed_service_bytecode.decompress() },
358        )
359        .await
360        .join()
361        .await?;
362        match application_description.module_id.vm_runtime {
363            VmRuntime::Wasm => {
364                cfg_if::cfg_if! {
365                    if #[cfg(with_wasm_runtime)] {
366                        let Some(wasm_runtime) = self.wasm_runtime() else {
367                            panic!("A Wasm runtime is required to load user applications.");
368                        };
369                        Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
370                           .await?
371                           .into())
372                    } else {
373                        panic!(
374                            "A Wasm runtime is required to load user applications. \
375                             Please enable the `wasmer` or the `wasmtime` feature flags \
376                             when compiling `linera-storage`."
377                        );
378                    }
379                }
380            }
381            VmRuntime::Evm => {
382                cfg_if::cfg_if! {
383                    if #[cfg(with_revm)] {
384                        let evm_runtime = EvmRuntime::Revm;
385                        Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
386                           .into())
387                    } else {
388                        panic!(
389                            "An Evm runtime is required to load user applications. \
390                             Please enable the `revm` feature flag \
391                             when compiling `linera-storage`."
392                        );
393                    }
394                }
395            }
396        }
397    }
398
399    async fn block_exporter_context(
400        &self,
401        block_exporter_id: u32,
402    ) -> Result<Self::BlockExporterContext, ViewError>;
403}
404
405/// The result of processing the obtained read certificates.
406pub enum ResultReadCertificates {
407    Certificates(Vec<ConfirmedBlockCertificate>),
408    InvalidHashes(Vec<CryptoHash>),
409}
410
411impl ResultReadCertificates {
412    /// Creating the processed read certificates.
413    pub fn new(
414        certificates: Vec<Option<ConfirmedBlockCertificate>>,
415        hashes: Vec<CryptoHash>,
416    ) -> Self {
417        let (certificates, invalid_hashes) = certificates
418            .into_iter()
419            .zip(hashes)
420            .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
421                Some(cert) => itertools::Either::Left(cert),
422                None => itertools::Either::Right(hash),
423            });
424        if invalid_hashes.is_empty() {
425            Self::Certificates(certificates)
426        } else {
427            Self::InvalidHashes(invalid_hashes)
428        }
429    }
430}
431
432/// An implementation of `ExecutionRuntimeContext` suitable for the core protocol.
433#[derive(Clone)]
434pub struct ChainRuntimeContext<S> {
435    storage: S,
436    chain_id: ChainId,
437    execution_runtime_config: ExecutionRuntimeConfig,
438    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
439    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
440}
441
442#[cfg_attr(not(web), async_trait)]
443#[cfg_attr(web, async_trait(?Send))]
444impl<S> ExecutionRuntimeContext for ChainRuntimeContext<S>
445where
446    S: Storage + Send + Sync,
447{
448    fn chain_id(&self) -> ChainId {
449        self.chain_id
450    }
451
452    fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
453        self.execution_runtime_config
454    }
455
456    fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
457        &self.user_contracts
458    }
459
460    fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
461        &self.user_services
462    }
463
464    async fn get_user_contract(
465        &self,
466        description: &ApplicationDescription,
467        txn_tracker: &TransactionTracker,
468    ) -> Result<UserContractCode, ExecutionError> {
469        let application_id = description.into();
470        let pinned = self.user_contracts.pin_owned();
471        if let Some(contract) = pinned.get(&application_id) {
472            return Ok(contract.clone());
473        }
474        let contract = self.storage.load_contract(description, txn_tracker).await?;
475        pinned.insert(application_id, contract.clone());
476        Ok(contract)
477    }
478
479    async fn get_user_service(
480        &self,
481        description: &ApplicationDescription,
482        txn_tracker: &TransactionTracker,
483    ) -> Result<UserServiceCode, ExecutionError> {
484        let application_id = description.into();
485        let pinned = self.user_services.pin_owned();
486        if let Some(service) = pinned.get(&application_id) {
487            return Ok(service.clone());
488        }
489        let service = self.storage.load_service(description, txn_tracker).await?;
490        pinned.insert(application_id, service.clone());
491        Ok(service)
492    }
493
494    async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
495        self.storage.read_blob(blob_id).await
496    }
497
498    async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
499        self.storage.read_event(event_id).await
500    }
501
502    async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
503        self.storage.read_network_description().await
504    }
505
506    async fn committees_for(
507        &self,
508        epoch_range: RangeInclusive<Epoch>,
509    ) -> Result<BTreeMap<Epoch, Committee>, ViewError> {
510        self.storage.committees_for(epoch_range).await
511    }
512
513    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
514        self.storage.contains_blob(blob_id).await
515    }
516
517    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
518        self.storage.contains_event(event_id).await
519    }
520
521    #[cfg(with_testing)]
522    async fn add_blobs(
523        &self,
524        blobs: impl IntoIterator<Item = Blob> + Send,
525    ) -> Result<(), ViewError> {
526        let blobs = Vec::from_iter(blobs);
527        self.storage.write_blobs(&blobs).await
528    }
529
530    #[cfg(with_testing)]
531    async fn add_events(
532        &self,
533        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
534    ) -> Result<(), ViewError> {
535        self.storage.write_events(events).await
536    }
537}
538
539/// A clock that can be used to get the current `Timestamp`.
540#[cfg_attr(not(web), async_trait)]
541#[cfg_attr(web, async_trait(?Send))]
542pub trait Clock {
543    fn current_time(&self) -> Timestamp;
544
545    async fn sleep(&self, delta: TimeDelta);
546
547    async fn sleep_until(&self, timestamp: Timestamp);
548}