linera_storage/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the storage abstractions for individual chains and certificates.
5
6#![deny(clippy::large_futures)]
7
8mod db_storage;
9
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use dashmap::{mapref::entry::Entry, DashMap};
14use linera_base::{
15    crypto::CryptoHash,
16    data_types::{
17        ApplicationDescription, Blob, ChainDescription, CompressedBytecode, NetworkDescription,
18        TimeDelta, Timestamp,
19    },
20    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
21    vm::VmRuntime,
22};
23use linera_chain::{
24    types::{ConfirmedBlock, ConfirmedBlockCertificate},
25    ChainError, ChainStateView,
26};
27#[cfg(with_revm)]
28use linera_execution::{
29    evm::revm::{EvmContractModule, EvmServiceModule},
30    EvmRuntime,
31};
32use linera_execution::{
33    BlobState, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, UserContractCode,
34    UserServiceCode, WasmRuntime,
35};
36#[cfg(with_wasm_runtime)]
37use linera_execution::{WasmContractModule, WasmServiceModule};
38use linera_views::{context::Context, views::RootView, ViewError};
39
40#[cfg(with_metrics)]
41pub use crate::db_storage::metrics;
42#[cfg(with_testing)]
43pub use crate::db_storage::TestClock;
44pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
45
46/// The default namespace to be used when none is specified
47pub const DEFAULT_NAMESPACE: &str = "table_linera";
48
49/// Communicate with a persistent storage using the "views" abstraction.
50#[cfg_attr(not(web), async_trait)]
51#[cfg_attr(web, async_trait(?Send))]
52pub trait Storage: Sized {
53    /// The low-level storage implementation in use by the core protocol (chain workers etc).
54    type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + Send + Sync + 'static;
55
56    /// The clock type being used.
57    type Clock: Clock;
58
59    /// The low-level storage implementation in use by the block exporter.
60    type BlockExporterContext: Context<Extra = u32> + Clone + Send + Sync + 'static;
61
62    /// Returns the current wall clock time.
63    fn clock(&self) -> &Self::Clock;
64
65    /// Loads the view of a chain state.
66    ///
67    /// # Notes
68    ///
69    /// Each time this method is called, a new [`ChainStateView`] is created. If there are multiple
70    /// instances of the same chain active at any given moment, they will race to access persistent
71    /// storage. This can lead to invalid states and data corruption.
72    async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
73
74    /// Tests the existence of a blob with the given blob ID.
75    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
76
77    /// Returns what blobs from the input are missing from storage.
78    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
79
80    /// Tests existence of a blob state with the given blob ID.
81    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
82
83    /// Reads the hashed certificate value with the given hash.
84    async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, ViewError>;
85
86    /// Reads the blob with the given blob ID.
87    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
88
89    /// Reads the blobs with the given blob IDs.
90    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
91
92    /// Reads the blob state with the given blob ID.
93    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
94
95    /// Reads the blob states with the given blob IDs.
96    async fn read_blob_states(
97        &self,
98        blob_ids: &[BlobId],
99    ) -> Result<Vec<Option<BlobState>>, ViewError>;
100
101    /// Reads the hashed certificate values in descending order from the given hash.
102    async fn read_confirmed_blocks_downward(
103        &self,
104        from: CryptoHash,
105        limit: u32,
106    ) -> Result<Vec<ConfirmedBlock>, ViewError>;
107
108    /// Writes the given blob.
109    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
110
111    /// Writes blobs and certificate
112    async fn write_blobs_and_certificate(
113        &self,
114        blobs: &[Blob],
115        certificate: &ConfirmedBlockCertificate,
116    ) -> Result<(), ViewError>;
117
118    /// Writes the given blobs, but only if they already have a blob state. Returns `true` for the
119    /// blobs that were written.
120    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
121
122    /// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
123    async fn maybe_write_blob_states(
124        &self,
125        blob_ids: &[BlobId],
126        blob_state: BlobState,
127    ) -> Result<(), ViewError>;
128
129    /// Writes several blobs.
130    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
131
132    /// Tests existence of the certificate with the given hash.
133    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
134
135    /// Reads the certificate with the given hash.
136    async fn read_certificate(
137        &self,
138        hash: CryptoHash,
139    ) -> Result<ConfirmedBlockCertificate, ViewError>;
140
141    /// Reads a number of certificates
142    async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
143        &self,
144        hashes: I,
145    ) -> Result<Vec<ConfirmedBlockCertificate>, ViewError>;
146
147    /// Reads the event with the given ID.
148    async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
149
150    /// Tests existence of the event with the given ID.
151    async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
152
153    /// Lists all the events from a starting index
154    async fn read_events_from_index(
155        &self,
156        chain_id: &ChainId,
157        stream_id: &StreamId,
158        start_index: u32,
159    ) -> Result<Vec<IndexAndEvent>, ViewError>;
160
161    /// Writes a vector of events.
162    async fn write_events(
163        &self,
164        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
165    ) -> Result<(), ViewError>;
166
167    /// Reads the network description.
168    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
169
170    /// Writes the network description.
171    async fn write_network_description(
172        &self,
173        information: &NetworkDescription,
174    ) -> Result<(), ViewError>;
175
176    /// Initializes a chain in a simple way (used for testing and to create a genesis state).
177    ///
178    /// # Notes
179    ///
180    /// This method creates a new [`ChainStateView`] instance. If there are multiple instances of
181    /// the same chain active at any given moment, they will race to access persistent storage.
182    /// This can lead to invalid states and data corruption.
183    async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
184    where
185        ChainRuntimeContext<Self>: ExecutionRuntimeContext,
186    {
187        let id = description.id();
188        // Store the description blob.
189        self.write_blob(&Blob::new_chain_description(&description))
190            .await?;
191        let mut chain = self.load_chain(id).await?;
192        assert!(!chain.is_active(), "Attempting to create a chain twice");
193        let current_time = self.clock().current_time();
194        chain.ensure_is_active(current_time).await?;
195        chain.save().await?;
196        Ok(())
197    }
198
199    /// Selects the WebAssembly runtime to use for applications (if any).
200    fn wasm_runtime(&self) -> Option<WasmRuntime>;
201
202    /// Creates a [`UserContractCode`] instance using the bytecode in storage referenced
203    /// by the `application_description`.
204    async fn load_contract(
205        &self,
206        application_description: &ApplicationDescription,
207    ) -> Result<UserContractCode, ExecutionError> {
208        let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
209        let contract_blob = self.read_blob(contract_bytecode_blob_id).await?.ok_or(
210            ExecutionError::BlobsNotFound(vec![contract_bytecode_blob_id]),
211        )?;
212        let compressed_contract_bytecode = CompressedBytecode {
213            compressed_bytes: contract_blob.into_bytes().to_vec(),
214        };
215        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
216        let contract_bytecode =
217            linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
218                move |_| async move { compressed_contract_bytecode.decompress() },
219            )
220            .await
221            .join()
222            .await?;
223        match application_description.module_id.vm_runtime {
224            VmRuntime::Wasm => {
225                cfg_if::cfg_if! {
226                    if #[cfg(with_wasm_runtime)] {
227                        let Some(wasm_runtime) = self.wasm_runtime() else {
228                            panic!("A Wasm runtime is required to load user applications.");
229                        };
230                        Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
231                           .await?
232                           .into())
233                    } else {
234                        panic!(
235                            "A Wasm runtime is required to load user applications. \
236                             Please enable the `wasmer` or the `wasmtime` feature flags \
237                             when compiling `linera-storage`."
238                        );
239                    }
240                }
241            }
242            VmRuntime::Evm => {
243                cfg_if::cfg_if! {
244                    if #[cfg(with_revm)] {
245                        let evm_runtime = EvmRuntime::Revm;
246                        Ok(EvmContractModule::new(contract_bytecode, evm_runtime)
247                           .await?
248                           .into())
249                    } else {
250                        panic!(
251                            "An Evm runtime is required to load user applications. \
252                             Please enable the `revm` feature flag \
253                             when compiling `linera-storage`."
254                        );
255                    }
256                }
257            }
258        }
259    }
260
261    /// Creates a [`linera-sdk::UserContract`] instance using the bytecode in storage referenced
262    /// by the `application_description`.
263    async fn load_service(
264        &self,
265        application_description: &ApplicationDescription,
266    ) -> Result<UserServiceCode, ExecutionError> {
267        let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
268        let service_blob = self.read_blob(service_bytecode_blob_id).await?.ok_or(
269            ExecutionError::BlobsNotFound(vec![service_bytecode_blob_id]),
270        )?;
271        let compressed_service_bytecode = CompressedBytecode {
272            compressed_bytes: service_blob.into_bytes().to_vec(),
273        };
274        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
275        let service_bytecode = linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
276            move |_| async move { compressed_service_bytecode.decompress() },
277        )
278        .await
279        .join()
280        .await?;
281        match application_description.module_id.vm_runtime {
282            VmRuntime::Wasm => {
283                cfg_if::cfg_if! {
284                    if #[cfg(with_wasm_runtime)] {
285                        let Some(wasm_runtime) = self.wasm_runtime() else {
286                            panic!("A Wasm runtime is required to load user applications.");
287                        };
288                        Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
289                           .await?
290                           .into())
291                    } else {
292                        panic!(
293                            "A Wasm runtime is required to load user applications. \
294                             Please enable the `wasmer` or the `wasmtime` feature flags \
295                             when compiling `linera-storage`."
296                        );
297                    }
298                }
299            }
300            VmRuntime::Evm => {
301                cfg_if::cfg_if! {
302                    if #[cfg(with_revm)] {
303                        let evm_runtime = EvmRuntime::Revm;
304                        Ok(EvmServiceModule::new(service_bytecode, evm_runtime)
305                           .await?
306                           .into())
307                    } else {
308                        panic!(
309                            "An Evm runtime is required to load user applications. \
310                             Please enable the `revm` feature flag \
311                             when compiling `linera-storage`."
312                        );
313                    }
314                }
315            }
316        }
317    }
318
319    async fn block_exporter_context(
320        &self,
321        block_exporter_id: u32,
322    ) -> Result<Self::BlockExporterContext, ViewError>;
323}
324
325/// An implementation of `ExecutionRuntimeContext` suitable for the core protocol.
326#[derive(Clone)]
327pub struct ChainRuntimeContext<S> {
328    storage: S,
329    chain_id: ChainId,
330    execution_runtime_config: ExecutionRuntimeConfig,
331    user_contracts: Arc<DashMap<ApplicationId, UserContractCode>>,
332    user_services: Arc<DashMap<ApplicationId, UserServiceCode>>,
333}
334
335#[cfg_attr(not(web), async_trait)]
336#[cfg_attr(web, async_trait(?Send))]
337impl<S> ExecutionRuntimeContext for ChainRuntimeContext<S>
338where
339    S: Storage + Send + Sync,
340{
341    fn chain_id(&self) -> ChainId {
342        self.chain_id
343    }
344
345    fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
346        self.execution_runtime_config
347    }
348
349    fn user_contracts(&self) -> &Arc<DashMap<ApplicationId, UserContractCode>> {
350        &self.user_contracts
351    }
352
353    fn user_services(&self) -> &Arc<DashMap<ApplicationId, UserServiceCode>> {
354        &self.user_services
355    }
356
357    async fn get_user_contract(
358        &self,
359        description: &ApplicationDescription,
360    ) -> Result<UserContractCode, ExecutionError> {
361        match self.user_contracts.entry(description.into()) {
362            Entry::Occupied(entry) => Ok(entry.get().clone()),
363            Entry::Vacant(entry) => {
364                let contract = self.storage.load_contract(description).await?;
365                entry.insert(contract.clone());
366                Ok(contract)
367            }
368        }
369    }
370
371    async fn get_user_service(
372        &self,
373        description: &ApplicationDescription,
374    ) -> Result<UserServiceCode, ExecutionError> {
375        match self.user_services.entry(description.into()) {
376            Entry::Occupied(entry) => Ok(entry.get().clone()),
377            Entry::Vacant(entry) => {
378                let service = self.storage.load_service(description).await?;
379                entry.insert(service.clone());
380                Ok(service)
381            }
382        }
383    }
384
385    async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
386        self.storage.read_blob(blob_id).await
387    }
388
389    async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
390        self.storage.read_event(event_id).await
391    }
392
393    async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
394        self.storage.read_network_description().await
395    }
396
397    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
398        self.storage.contains_blob(blob_id).await
399    }
400
401    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
402        self.storage.contains_event(event_id).await
403    }
404
405    #[cfg(with_testing)]
406    async fn add_blobs(
407        &self,
408        blobs: impl IntoIterator<Item = Blob> + Send,
409    ) -> Result<(), ViewError> {
410        let blobs = Vec::from_iter(blobs);
411        self.storage.write_blobs(&blobs).await
412    }
413
414    #[cfg(with_testing)]
415    async fn add_events(
416        &self,
417        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
418    ) -> Result<(), ViewError> {
419        self.storage.write_events(events).await
420    }
421}
422
423/// A clock that can be used to get the current `Timestamp`.
424#[cfg_attr(not(web), async_trait)]
425#[cfg_attr(web, async_trait(?Send))]
426pub trait Clock {
427    fn current_time(&self) -> Timestamp;
428
429    async fn sleep(&self, delta: TimeDelta);
430
431    async fn sleep_until(&self, timestamp: Timestamp);
432}