linera_storage/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the storage abstractions for individual chains and certificates.
5
6#![deny(clippy::large_futures)]
7
8mod db_storage;
9
10use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc};
11
12use async_trait::async_trait;
13use dashmap::DashMap;
14use itertools::Itertools;
15use linera_base::{
16    crypto::CryptoHash,
17    data_types::{
18        ApplicationDescription, Blob, ChainDescription, CompressedBytecode, Epoch,
19        NetworkDescription, TimeDelta, Timestamp,
20    },
21    identifiers::{ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent, StreamId},
22    vm::VmRuntime,
23};
24use linera_chain::{
25    types::{ConfirmedBlock, ConfirmedBlockCertificate},
26    ChainError, ChainStateView,
27};
28use linera_execution::{
29    committee::Committee, system::EPOCH_STREAM_NAME, BlobState, ExecutionError,
30    ExecutionRuntimeConfig, ExecutionRuntimeContext, UserContractCode, UserServiceCode,
31    WasmRuntime,
32};
33#[cfg(with_revm)]
34use linera_execution::{
35    evm::revm::{EvmContractModule, EvmServiceModule},
36    EvmRuntime,
37};
38#[cfg(with_wasm_runtime)]
39use linera_execution::{WasmContractModule, WasmServiceModule};
40use linera_views::{context::Context, views::RootView, ViewError};
41
42#[cfg(with_metrics)]
43pub use crate::db_storage::metrics;
44#[cfg(with_testing)]
45pub use crate::db_storage::TestClock;
46pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
47
48/// The default namespace to be used when none is specified
49pub const DEFAULT_NAMESPACE: &str = "table_linera";
50
51/// Communicate with a persistent storage using the "views" abstraction.
52#[cfg_attr(not(web), async_trait)]
53#[cfg_attr(web, async_trait(?Send))]
54pub trait Storage: Sized {
55    /// The low-level storage implementation in use by the core protocol (chain workers etc).
56    type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + Send + Sync + 'static;
57
58    /// The clock type being used.
59    type Clock: Clock;
60
61    /// The low-level storage implementation in use by the block exporter.
62    type BlockExporterContext: Context<Extra = u32> + Clone + Send + Sync + 'static;
63
64    /// Returns the current wall clock time.
65    fn clock(&self) -> &Self::Clock;
66
67    /// Loads the view of a chain state.
68    ///
69    /// # Notes
70    ///
71    /// Each time this method is called, a new [`ChainStateView`] is created. If there are multiple
72    /// instances of the same chain active at any given moment, they will race to access persistent
73    /// storage. This can lead to invalid states and data corruption.
74    async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
75
76    /// Tests the existence of a blob with the given blob ID.
77    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
78
79    /// Returns what blobs from the input are missing from storage.
80    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
81
82    /// Tests existence of a blob state with the given blob ID.
83    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
84
85    /// Reads the hashed certificate value with the given hash.
86    async fn read_confirmed_block(
87        &self,
88        hash: CryptoHash,
89    ) -> Result<Option<ConfirmedBlock>, ViewError>;
90
91    /// Reads the blob with the given blob ID.
92    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
93
94    /// Reads the blobs with the given blob IDs.
95    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
96
97    /// Reads the blob state with the given blob ID.
98    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
99
100    /// Reads the blob states with the given blob IDs.
101    async fn read_blob_states(
102        &self,
103        blob_ids: &[BlobId],
104    ) -> Result<Vec<Option<BlobState>>, ViewError>;
105
106    /// Writes the given blob.
107    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
108
109    /// Writes blobs and certificate
110    async fn write_blobs_and_certificate(
111        &self,
112        blobs: &[Blob],
113        certificate: &ConfirmedBlockCertificate,
114    ) -> Result<(), ViewError>;
115
116    /// Writes the given blobs, but only if they already have a blob state. Returns `true` for the
117    /// blobs that were written.
118    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
119
120    /// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
121    async fn maybe_write_blob_states(
122        &self,
123        blob_ids: &[BlobId],
124        blob_state: BlobState,
125    ) -> Result<(), ViewError>;
126
127    /// Writes several blobs.
128    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
129
130    /// Tests existence of the certificate with the given hash.
131    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
132
133    /// Reads the certificate with the given hash.
134    async fn read_certificate(
135        &self,
136        hash: CryptoHash,
137    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
138
139    /// Reads a number of certificates
140    async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
141        &self,
142        hashes: I,
143    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
144
145    /// Reads the event with the given ID.
146    async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
147
148    /// Tests existence of the event with the given ID.
149    async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
150
151    /// Lists all the events from a starting index
152    async fn read_events_from_index(
153        &self,
154        chain_id: &ChainId,
155        stream_id: &StreamId,
156        start_index: u32,
157    ) -> Result<Vec<IndexAndEvent>, ViewError>;
158
159    /// Writes a vector of events.
160    async fn write_events(
161        &self,
162        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
163    ) -> Result<(), ViewError>;
164
165    /// Reads the network description.
166    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
167
168    /// Writes the network description.
169    async fn write_network_description(
170        &self,
171        information: &NetworkDescription,
172    ) -> Result<(), ViewError>;
173
174    /// Returns a map of the committees for the given epochs.
175    async fn committees_for(
176        &self,
177        epoch_range: RangeInclusive<Epoch>,
178    ) -> Result<BTreeMap<Epoch, Committee>, ViewError> {
179        // Short-circuit for an empty input range.
180        if epoch_range.is_empty() {
181            return Ok(BTreeMap::new());
182        }
183        let min_epoch = epoch_range.start();
184        let max_epoch = epoch_range.end();
185        let read_committee = async |committee_hash| -> Result<Committee, ViewError> {
186            let blob_id = BlobId::new(committee_hash, BlobType::Committee);
187            let committee_blob = self
188                .read_blob(blob_id)
189                .await?
190                .ok_or_else(|| ViewError::NotFound(format!("blob {}", blob_id)))?;
191            Ok(bcs::from_bytes(committee_blob.bytes())?)
192        };
193
194        let network_description = self
195            .read_network_description()
196            .await?
197            .ok_or_else(|| ViewError::NotFound("NetworkDescription not found".to_owned()))?;
198        let admin_chain_id = network_description.admin_chain_id;
199        let mut result = BTreeMap::new();
200        // special case: the genesis epoch is stored in the NetworkDescription
201        if *min_epoch == Epoch::ZERO {
202            let genesis_committee =
203                read_committee(network_description.genesis_committee_blob_hash).await?;
204            result.insert(Epoch::ZERO, genesis_committee);
205        }
206
207        let start_index = min_epoch.0.max(1);
208        let epoch_creation_events = self
209            .read_events_from_index(
210                &admin_chain_id,
211                &StreamId::system(EPOCH_STREAM_NAME),
212                start_index,
213            )
214            .await?;
215
216        result.extend(
217            futures::future::try_join_all(
218                epoch_creation_events
219                    .into_iter()
220                    .take_while(|index_and_event| index_and_event.index <= max_epoch.0)
221                    .map(|index_and_event| async move {
222                        let epoch = Epoch::from(index_and_event.index);
223                        let maybe_blob_hash = bcs::from_bytes::<CryptoHash>(&index_and_event.event);
224                        let committee = read_committee(maybe_blob_hash?).await?;
225                        Result::<_, ViewError>::Ok((epoch, committee))
226                    }),
227            )
228            .await?,
229        );
230
231        Ok(result)
232    }
233
234    /// Initializes a chain in a simple way (used for testing and to create a genesis state).
235    ///
236    /// # Notes
237    ///
238    /// This method creates a new [`ChainStateView`] instance. If there are multiple instances of
239    /// the same chain active at any given moment, they will race to access persistent storage.
240    /// This can lead to invalid states and data corruption.
241    async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
242    where
243        ChainRuntimeContext<Self>: ExecutionRuntimeContext,
244    {
245        let id = description.id();
246        // Store the description blob.
247        self.write_blob(&Blob::new_chain_description(&description))
248            .await?;
249        let mut chain = self.load_chain(id).await?;
250        assert!(!chain.is_active(), "Attempting to create a chain twice");
251        let current_time = self.clock().current_time();
252        chain.ensure_is_active(current_time).await?;
253        chain.save().await?;
254        Ok(())
255    }
256
257    /// Selects the WebAssembly runtime to use for applications (if any).
258    fn wasm_runtime(&self) -> Option<WasmRuntime>;
259
260    /// Creates a [`UserContractCode`] instance using the bytecode in storage referenced
261    /// by the `application_description`.
262    async fn load_contract(
263        &self,
264        application_description: &ApplicationDescription,
265    ) -> Result<UserContractCode, ExecutionError> {
266        let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
267        let contract_blob = self.read_blob(contract_bytecode_blob_id).await?.ok_or(
268            ExecutionError::BlobsNotFound(vec![contract_bytecode_blob_id]),
269        )?;
270        let compressed_contract_bytecode = CompressedBytecode {
271            compressed_bytes: contract_blob.into_bytes().to_vec(),
272        };
273        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
274        let contract_bytecode =
275            linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
276                move |_| async move { compressed_contract_bytecode.decompress() },
277            )
278            .await
279            .join()
280            .await?;
281        match application_description.module_id.vm_runtime {
282            VmRuntime::Wasm => {
283                cfg_if::cfg_if! {
284                    if #[cfg(with_wasm_runtime)] {
285                        let Some(wasm_runtime) = self.wasm_runtime() else {
286                            panic!("A Wasm runtime is required to load user applications.");
287                        };
288                        Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
289                           .await?
290                           .into())
291                    } else {
292                        panic!(
293                            "A Wasm runtime is required to load user applications. \
294                             Please enable the `wasmer` or the `wasmtime` feature flags \
295                             when compiling `linera-storage`."
296                        );
297                    }
298                }
299            }
300            VmRuntime::Evm => {
301                cfg_if::cfg_if! {
302                    if #[cfg(with_revm)] {
303                        let evm_runtime = EvmRuntime::Revm;
304                        Ok(EvmContractModule::new(contract_bytecode, evm_runtime)
305                           .await?
306                           .into())
307                    } else {
308                        panic!(
309                            "An Evm runtime is required to load user applications. \
310                             Please enable the `revm` feature flag \
311                             when compiling `linera-storage`."
312                        );
313                    }
314                }
315            }
316        }
317    }
318
319    /// Creates a [`linera-sdk::UserContract`] instance using the bytecode in storage referenced
320    /// by the `application_description`.
321    async fn load_service(
322        &self,
323        application_description: &ApplicationDescription,
324    ) -> Result<UserServiceCode, ExecutionError> {
325        let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
326        let service_blob = self.read_blob(service_bytecode_blob_id).await?.ok_or(
327            ExecutionError::BlobsNotFound(vec![service_bytecode_blob_id]),
328        )?;
329        let compressed_service_bytecode = CompressedBytecode {
330            compressed_bytes: service_blob.into_bytes().to_vec(),
331        };
332        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
333        let service_bytecode = linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
334            move |_| async move { compressed_service_bytecode.decompress() },
335        )
336        .await
337        .join()
338        .await?;
339        match application_description.module_id.vm_runtime {
340            VmRuntime::Wasm => {
341                cfg_if::cfg_if! {
342                    if #[cfg(with_wasm_runtime)] {
343                        let Some(wasm_runtime) = self.wasm_runtime() else {
344                            panic!("A Wasm runtime is required to load user applications.");
345                        };
346                        Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
347                           .await?
348                           .into())
349                    } else {
350                        panic!(
351                            "A Wasm runtime is required to load user applications. \
352                             Please enable the `wasmer` or the `wasmtime` feature flags \
353                             when compiling `linera-storage`."
354                        );
355                    }
356                }
357            }
358            VmRuntime::Evm => {
359                cfg_if::cfg_if! {
360                    if #[cfg(with_revm)] {
361                        let evm_runtime = EvmRuntime::Revm;
362                        Ok(EvmServiceModule::new(service_bytecode, evm_runtime)
363                           .await?
364                           .into())
365                    } else {
366                        panic!(
367                            "An Evm runtime is required to load user applications. \
368                             Please enable the `revm` feature flag \
369                             when compiling `linera-storage`."
370                        );
371                    }
372                }
373            }
374        }
375    }
376
377    async fn block_exporter_context(
378        &self,
379        block_exporter_id: u32,
380    ) -> Result<Self::BlockExporterContext, ViewError>;
381}
382
383/// The result of processing the obtained read certificates.
384pub enum ResultReadCertificates {
385    Certificates(Vec<ConfirmedBlockCertificate>),
386    InvalidHashes(Vec<CryptoHash>),
387}
388
389impl ResultReadCertificates {
390    /// Creating the processed read certificates.
391    pub fn new(
392        certificates: Vec<Option<ConfirmedBlockCertificate>>,
393        hashes: Vec<CryptoHash>,
394    ) -> Self {
395        let (certificates, invalid_hashes) = certificates
396            .into_iter()
397            .zip(hashes)
398            .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
399                Some(cert) => itertools::Either::Left(cert),
400                None => itertools::Either::Right(hash),
401            });
402        if invalid_hashes.is_empty() {
403            Self::Certificates(certificates)
404        } else {
405            Self::InvalidHashes(invalid_hashes)
406        }
407    }
408}
409
410/// An implementation of `ExecutionRuntimeContext` suitable for the core protocol.
411#[derive(Clone)]
412pub struct ChainRuntimeContext<S> {
413    storage: S,
414    chain_id: ChainId,
415    execution_runtime_config: ExecutionRuntimeConfig,
416    user_contracts: Arc<DashMap<ApplicationId, UserContractCode>>,
417    user_services: Arc<DashMap<ApplicationId, UserServiceCode>>,
418}
419
420#[cfg_attr(not(web), async_trait)]
421#[cfg_attr(web, async_trait(?Send))]
422impl<S> ExecutionRuntimeContext for ChainRuntimeContext<S>
423where
424    S: Storage + Send + Sync,
425{
426    fn chain_id(&self) -> ChainId {
427        self.chain_id
428    }
429
430    fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
431        self.execution_runtime_config
432    }
433
434    fn user_contracts(&self) -> &Arc<DashMap<ApplicationId, UserContractCode>> {
435        &self.user_contracts
436    }
437
438    fn user_services(&self) -> &Arc<DashMap<ApplicationId, UserServiceCode>> {
439        &self.user_services
440    }
441
442    async fn get_user_contract(
443        &self,
444        description: &ApplicationDescription,
445    ) -> Result<UserContractCode, ExecutionError> {
446        let application_id = description.into();
447        if let Some(contract) = self.user_contracts.get(&application_id) {
448            return Ok(contract.clone());
449        }
450        let contract = self.storage.load_contract(description).await?;
451        self.user_contracts.insert(application_id, contract.clone());
452        Ok(contract)
453    }
454
455    async fn get_user_service(
456        &self,
457        description: &ApplicationDescription,
458    ) -> Result<UserServiceCode, ExecutionError> {
459        let application_id = description.into();
460        if let Some(service) = self.user_services.get(&application_id) {
461            return Ok(service.clone());
462        }
463        let service = self.storage.load_service(description).await?;
464        self.user_services.insert(application_id, service.clone());
465        Ok(service)
466    }
467
468    async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
469        self.storage.read_blob(blob_id).await
470    }
471
472    async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
473        self.storage.read_event(event_id).await
474    }
475
476    async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
477        self.storage.read_network_description().await
478    }
479
480    async fn committees_for(
481        &self,
482        epoch_range: RangeInclusive<Epoch>,
483    ) -> Result<BTreeMap<Epoch, Committee>, ViewError> {
484        self.storage.committees_for(epoch_range).await
485    }
486
487    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
488        self.storage.contains_blob(blob_id).await
489    }
490
491    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
492        self.storage.contains_event(event_id).await
493    }
494
495    #[cfg(with_testing)]
496    async fn add_blobs(
497        &self,
498        blobs: impl IntoIterator<Item = Blob> + Send,
499    ) -> Result<(), ViewError> {
500        let blobs = Vec::from_iter(blobs);
501        self.storage.write_blobs(&blobs).await
502    }
503
504    #[cfg(with_testing)]
505    async fn add_events(
506        &self,
507        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
508    ) -> Result<(), ViewError> {
509        self.storage.write_events(events).await
510    }
511}
512
513/// A clock that can be used to get the current `Timestamp`.
514#[cfg_attr(not(web), async_trait)]
515#[cfg_attr(web, async_trait(?Send))]
516pub trait Clock {
517    fn current_time(&self) -> Timestamp;
518
519    async fn sleep(&self, delta: TimeDelta);
520
521    async fn sleep_until(&self, timestamp: Timestamp);
522}