Skip to main content

linera_storage/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the storage abstractions for individual chains and certificates.
5
6#![deny(missing_docs)]
7
8mod db_storage;
9
10use std::sync::Arc as StdArc;
11
12use async_trait::async_trait;
13use itertools::Itertools;
14use linera_base::{
15    crypto::CryptoHash,
16    data_types::{
17        ApplicationDescription, Blob, BlockHeight, ChainDescription, CompressedBytecode, Epoch,
18        NetworkDescription, Timestamp,
19    },
20    identifiers::{ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent, StreamId},
21    vm::VmRuntime,
22};
23pub use linera_cache::{Arc, DEFAULT_CLEANUP_INTERVAL_SECS};
24use linera_chain::{
25    types::{ConfirmedBlock, ConfirmedBlockCertificate},
26    ChainError, ChainStateView,
27};
28use linera_execution::{
29    committee::Committee, BlobState, ExecutionError, ExecutionRuntimeConfig,
30    ExecutionRuntimeContext, SharedCommittees, TransactionTracker, UserContractCode,
31    UserServiceCode, WasmRuntime,
32};
33#[cfg(with_revm)]
34use linera_execution::{
35    evm::revm::{EvmContractModule, EvmServiceModule},
36    EvmRuntime,
37};
38#[cfg(with_wasm_runtime)]
39use linera_execution::{WasmContractModule, WasmServiceModule};
40use linera_views::{context::Context, views::RootView, ViewError};
41
42#[cfg(with_metrics)]
43pub use crate::db_storage::metrics;
44pub use crate::db_storage::{
45    ChainStatesFirstAssignment, DbStorage, RootKey, StorageCacheConfig, StorageCaches, WallClock,
46};
47#[cfg(with_testing)]
48pub use crate::db_storage::{TestClock, DEFAULT_STORAGE_CACHE_CONFIG};
49
50/// The default namespace to be used when none is specified
51pub const DEFAULT_NAMESPACE: &str = "default";
52
53/// Communicate with a persistent storage using the "views" abstraction.
54#[cfg_attr(not(web), async_trait)]
55#[cfg_attr(web, async_trait(?Send))]
56pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
57    /// The low-level storage implementation in use by the core protocol (chain workers etc).
58    type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
59
60    /// The clock type being used.
61    type Clock: Clock + Clone + Send + Sync;
62
63    /// The low-level storage implementation in use by the block exporter.
64    type BlockExporterContext: Context<Extra = u32> + Clone;
65
66    /// Returns the current wall clock time.
67    fn clock(&self) -> &Self::Clock;
68
69    /// Returns the thread pool used to run blocking work such as bytecode decompression.
70    fn thread_pool(&self) -> &StdArc<linera_execution::ThreadPool>;
71
72    /// Loads the view of a chain state.
73    ///
74    /// # Notes
75    ///
76    /// Each time this method is called, a new [`ChainStateView`] is created. If there are multiple
77    /// instances of the same chain active at any given moment, they will race to access persistent
78    /// storage. This can lead to invalid states and data corruption.
79    async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
80
81    /// Tests the existence of a blob with the given blob ID.
82    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
83
84    /// Returns what blobs from the input are missing from storage.
85    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
86
87    /// Tests existence of a blob state with the given blob ID.
88    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
89
90    /// Reads the hashed certificate value with the given hash.
91    async fn read_confirmed_block(
92        &self,
93        hash: CryptoHash,
94    ) -> Result<Option<Arc<ConfirmedBlock>>, ViewError>;
95
96    /// Reads a number of confirmed blocks by their hashes.
97    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
98        &self,
99        hashes: I,
100    ) -> Result<Vec<Option<Arc<ConfirmedBlock>>>, ViewError>;
101
102    /// Reads the blob with the given blob ID.
103    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError>;
104
105    /// Reads the blobs with the given blob IDs.
106    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Arc<Blob>>>, ViewError>;
107
108    /// Reads the blob state with the given blob ID.
109    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
110
111    /// Reads the blob states with the given blob IDs.
112    async fn read_blob_states(
113        &self,
114        blob_ids: &[BlobId],
115    ) -> Result<Vec<Option<BlobState>>, ViewError>;
116
117    /// Writes the given blob.
118    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
119
120    /// Writes blobs and certificate
121    async fn write_blobs_and_certificate(
122        &self,
123        blobs: &[Blob],
124        certificate: &ConfirmedBlockCertificate,
125    ) -> Result<(), ViewError>;
126
127    /// Writes the given blobs, but only if they already have a blob state. Returns `true` for the
128    /// blobs that were written.
129    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
130
131    /// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
132    async fn maybe_write_blob_states(
133        &self,
134        blob_ids: &[BlobId],
135        blob_state: BlobState,
136    ) -> Result<(), ViewError>;
137
138    /// Writes several blobs.
139    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
140
141    /// Tests existence of the certificate with the given hash.
142    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
143
144    /// Inserts a certificate into the in-memory dedup cache and returns the
145    /// canonical [`Arc`]. If the cache already holds an `Arc` for this hash,
146    /// the passed-in `certificate` is dropped and the existing `Arc` is
147    /// returned. This must be used (rather than `Arc::new`) for any
148    /// freshly-constructed [`ConfirmedBlockCertificate`] that should
149    /// participate in the "one allocation per content" invariant.
150    fn cache_certificate(
151        &self,
152        certificate: ConfirmedBlockCertificate,
153    ) -> Arc<ConfirmedBlockCertificate>;
154
155    /// Inserts a blob into the in-memory dedup cache and returns the canonical
156    /// [`Arc`]. If the cache already holds an `Arc` for this blob ID, the
157    /// passed-in `blob` is dropped and the existing `Arc` is returned. This
158    /// must be used (rather than `Arc::new`) for any freshly-constructed
159    /// [`Blob`] that should participate in the "one allocation per content"
160    /// invariant.
161    fn cache_blob(&self, blob: Blob) -> Arc<Blob>;
162
163    /// Inserts a confirmed block into the in-memory dedup cache and returns
164    /// the canonical [`Arc`]. If the cache already holds an `Arc` for this
165    /// hash, the passed-in `block` is dropped and the existing `Arc` is
166    /// returned. This must be used (rather than `Arc::new`) for any
167    /// freshly-constructed [`ConfirmedBlock`] that should participate in the
168    /// "one allocation per content" invariant.
169    fn cache_confirmed_block(&self, block: ConfirmedBlock) -> Arc<ConfirmedBlock>;
170
171    /// Reads the certificate with the given hash.
172    async fn read_certificate(
173        &self,
174        hash: CryptoHash,
175    ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError>;
176
177    /// Reads a number of certificates
178    async fn read_certificates(
179        &self,
180        hashes: &[CryptoHash],
181    ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError>;
182
183    /// Reads raw certificate bytes by hashes.
184    ///
185    /// Returns a vector where each element corresponds to the input hash.
186    /// Elements are `None` if no certificate exists for that hash.
187    /// Each found certificate is returned as `Some((lite_certificate_bytes, confirmed_block_bytes))`.
188    async fn read_certificates_raw(
189        &self,
190        hashes: &[CryptoHash],
191    ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError>;
192
193    /// Reads certificates by heights for a given chain.
194    /// Returns a vector where each element corresponds to the input height.
195    /// Elements are `None` if no certificate exists at that height.
196    async fn read_certificates_by_heights(
197        &self,
198        chain_id: ChainId,
199        heights: &[BlockHeight],
200    ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError>;
201
202    /// Reads raw certificates by heights for a given chain.
203    /// Returns a vector where each element corresponds to the input height.
204    /// Elements are `None` if no certificate exists at that height.
205    /// Each found certificate is returned as a tuple of (lite_certificate_bytes, confirmed_block_bytes).
206    async fn read_certificates_by_heights_raw(
207        &self,
208        chain_id: ChainId,
209        heights: &[BlockHeight],
210    ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError>;
211
212    /// Returns a vector of certificate hashes for the requested chain and heights.
213    /// The resulting vector maintains the order of the input `heights` argument.
214    /// Elements are `None` if no certificate exists at that height.
215    async fn read_certificate_hashes_by_heights(
216        &self,
217        chain_id: ChainId,
218        heights: &[BlockHeight],
219    ) -> Result<Vec<Option<CryptoHash>>, ViewError>;
220
221    /// Looks up the block heights where the given events were published.
222    /// Returns `None` for events that are not in the index.
223    async fn read_event_block_heights(
224        &self,
225        event_ids: &[EventId],
226    ) -> Result<Vec<Option<BlockHeight>>, ViewError>;
227
228    /// Reads the event with the given ID.
229    async fn read_event(&self, id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError>;
230
231    /// Tests existence of the event with the given ID.
232    async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
233
234    /// Lists all the events from a starting index
235    async fn read_events_from_index(
236        &self,
237        chain_id: &ChainId,
238        stream_id: &StreamId,
239        start_index: u32,
240    ) -> Result<Vec<IndexAndEvent>, ViewError>;
241
242    /// Writes a vector of events.
243    async fn write_events(
244        &self,
245        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
246    ) -> Result<(), ViewError>;
247
248    /// Reads the network description.
249    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
250
251    /// Writes the network description.
252    async fn write_network_description(
253        &self,
254        information: &NetworkDescription,
255    ) -> Result<(), ViewError>;
256
257    /// Initializes a chain in a simple way (used for testing and to create a genesis state).
258    ///
259    /// # Notes
260    ///
261    /// This method creates a new [`ChainStateView`] instance. If there are multiple instances of
262    /// the same chain active at any given moment, they will race to access persistent storage.
263    /// This can lead to invalid states and data corruption.
264    async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
265    where
266        ChainRuntimeContext<Self>: ExecutionRuntimeContext,
267    {
268        let id = description.id();
269        // Store the description blob and a `Genesis` blob state for it. The blob
270        // is not published by any block, so its provenance is the genesis config
271        // rather than a particular `(chain_id, block_height)`.
272        let description_blob = Blob::new_chain_description(&description);
273        let description_blob_id = description_blob.id();
274        self.write_blob(&description_blob).await?;
275        self.maybe_write_blob_states(&[description_blob_id], BlobState::GENESIS)
276            .await?;
277        let mut chain = self.load_chain(id).await?;
278        assert!(
279            !chain.is_active().await?,
280            "Attempting to create a chain twice"
281        );
282        let current_time = self.clock().current_time();
283        chain.initialize_if_needed(current_time).await?;
284        chain.save().await?;
285        Ok(())
286    }
287
288    /// Selects the WebAssembly runtime to use for applications (if any).
289    fn wasm_runtime(&self) -> Option<WasmRuntime>;
290
291    /// Creates a [`UserContractCode`] instance using the bytecode in storage referenced
292    /// by the `application_description`.
293    async fn load_contract(
294        &self,
295        application_description: &ApplicationDescription,
296        txn_tracker: &TransactionTracker,
297    ) -> Result<UserContractCode, ExecutionError> {
298        let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
299        let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
300            Some(content) => content.clone(),
301            None => self
302                .read_blob(contract_bytecode_blob_id)
303                .await?
304                .ok_or(ExecutionError::BlobsNotFound(vec![
305                    contract_bytecode_blob_id,
306                ]))?
307                .content()
308                .clone(),
309        };
310        let compressed_contract_bytecode = CompressedBytecode {
311            compressed_bytes: content.into_arc_bytes(),
312        };
313        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
314        let contract_bytecode = self
315            .thread_pool()
316            .run_send((), move |()| async move {
317                compressed_contract_bytecode.decompress()
318            })
319            .await
320            .await??;
321        match application_description.module_id.vm_runtime {
322            VmRuntime::Wasm => {
323                cfg_if::cfg_if! {
324                    if #[cfg(with_wasm_runtime)] {
325                        let Some(wasm_runtime) = self.wasm_runtime() else {
326                            panic!("A Wasm runtime is required to load user applications.");
327                        };
328                        Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
329                           .await?
330                           .into())
331                    } else {
332                        panic!(
333                            "A Wasm runtime is required to load user applications. \
334                             Please enable the `wasmer` or the `wasmtime` feature flags \
335                             when compiling `linera-storage`."
336                        );
337                    }
338                }
339            }
340            VmRuntime::Evm => {
341                cfg_if::cfg_if! {
342                    if #[cfg(with_revm)] {
343                        let evm_runtime = EvmRuntime::Revm;
344                        Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
345                           .into())
346                    } else {
347                        panic!(
348                            "An Evm runtime is required to load user applications. \
349                             Please enable the `revm` feature flag \
350                             when compiling `linera-storage`."
351                        );
352                    }
353                }
354            }
355        }
356    }
357
358    /// Creates a [`UserServiceCode`] instance using the bytecode in storage referenced
359    /// by the `application_description`.
360    async fn load_service(
361        &self,
362        application_description: &ApplicationDescription,
363        txn_tracker: &TransactionTracker,
364    ) -> Result<UserServiceCode, ExecutionError> {
365        let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
366        let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
367            Some(content) => content.clone(),
368            None => self
369                .read_blob(service_bytecode_blob_id)
370                .await?
371                .ok_or(ExecutionError::BlobsNotFound(vec![
372                    service_bytecode_blob_id,
373                ]))?
374                .content()
375                .clone(),
376        };
377        let compressed_service_bytecode = CompressedBytecode {
378            compressed_bytes: content.into_arc_bytes(),
379        };
380        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
381        let service_bytecode = self
382            .thread_pool()
383            .run_send((), move |()| async move {
384                compressed_service_bytecode.decompress()
385            })
386            .await
387            .await??;
388        match application_description.module_id.vm_runtime {
389            VmRuntime::Wasm => {
390                cfg_if::cfg_if! {
391                    if #[cfg(with_wasm_runtime)] {
392                        let Some(wasm_runtime) = self.wasm_runtime() else {
393                            panic!("A Wasm runtime is required to load user applications.");
394                        };
395                        Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
396                           .await?
397                           .into())
398                    } else {
399                        panic!(
400                            "A Wasm runtime is required to load user applications. \
401                             Please enable the `wasmer` or the `wasmtime` feature flags \
402                             when compiling `linera-storage`."
403                        );
404                    }
405                }
406            }
407            VmRuntime::Evm => {
408                cfg_if::cfg_if! {
409                    if #[cfg(with_revm)] {
410                        let evm_runtime = EvmRuntime::Revm;
411                        Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
412                           .into())
413                    } else {
414                        panic!(
415                            "An Evm runtime is required to load user applications. \
416                             Please enable the `revm` feature flag \
417                             when compiling `linera-storage`."
418                        );
419                    }
420                }
421            }
422        }
423    }
424
425    /// Returns the storage context used by the block exporter with the given ID.
426    async fn block_exporter_context(
427        &self,
428        block_exporter_id: u32,
429    ) -> Result<Self::BlockExporterContext, ViewError>;
430
431    /// Returns the process-wide committee cache shared by all chains.
432    fn shared_committees(&self) -> &SharedCommittees;
433
434    /// Returns the committee whose serialized form hashes to `hash`, loading it
435    /// from the blob store on cache miss.
436    async fn get_or_load_committee_by_hash(
437        &self,
438        hash: CryptoHash,
439    ) -> Result<StdArc<Committee>, ExecutionError> {
440        if let Some(committee) = self.shared_committees().get(hash) {
441            return Ok(committee);
442        }
443        let blob_id = BlobId::new(hash, BlobType::Committee);
444        let blob = self
445            .read_blob(blob_id)
446            .await?
447            .ok_or(ExecutionError::BlobsNotFound(vec![blob_id]))?;
448        let committee = bcs::from_bytes(blob.bytes())?;
449        Ok(self
450            .shared_committees()
451            .insert(hash, StdArc::new(committee)))
452    }
453
454    /// Returns whether the given epoch's committee has been revoked, i.e. whether the
455    /// admin chain has written a `REMOVED_EPOCH_STREAM` event for it.
456    async fn is_epoch_revoked(&self, epoch: Epoch) -> Result<bool, ExecutionError> {
457        let net_desc = self
458            .read_network_description()
459            .await?
460            .ok_or(ExecutionError::NoNetworkDescriptionFound)?;
461        let event_id = EventId {
462            chain_id: net_desc.admin_chain_id,
463            stream_id: StreamId::system(linera_execution::system::REMOVED_EPOCH_STREAM_NAME),
464            index: epoch.0,
465        };
466        Ok(self.contains_event(event_id).await?)
467    }
468
469    /// Returns the committee that signs blocks in the given epoch, looking up its blob
470    /// hash via the admin chain's epoch event stream (or the genesis committee for
471    /// epoch 0). Returns `Ok(None)` if the corresponding event has not been written
472    /// to local storage yet.
473    async fn committee_for_epoch(
474        &self,
475        epoch: Epoch,
476    ) -> Result<Option<StdArc<Committee>>, ExecutionError> {
477        let blob_hash = if epoch == Epoch::ZERO {
478            self.read_network_description()
479                .await?
480                .ok_or(ExecutionError::NoNetworkDescriptionFound)?
481                .genesis_committee_blob_hash
482        } else {
483            let net_desc = self
484                .read_network_description()
485                .await?
486                .ok_or(ExecutionError::NoNetworkDescriptionFound)?;
487            let event_id = EventId {
488                chain_id: net_desc.admin_chain_id,
489                stream_id: StreamId::system(linera_execution::system::EPOCH_STREAM_NAME),
490                index: epoch.0,
491            };
492            let Some(bytes) = self.read_event(event_id).await? else {
493                return Ok(None);
494            };
495            let event_data: linera_execution::system::EpochEventData = bcs::from_bytes(&bytes)?;
496            event_data.blob_hash
497        };
498        Ok(Some(self.get_or_load_committee_by_hash(blob_hash).await?))
499    }
500
501    /// Lists the blob IDs in storage.
502    async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError>;
503
504    /// Lists the chain IDs in storage.
505    async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError>;
506
507    /// Lists the event IDs in storage.
508    async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError>;
509}
510
511/// The result of processing the obtained read certificates.
512pub enum ResultReadCertificates {
513    /// All requested certificates were found.
514    Certificates(Vec<ConfirmedBlockCertificate>),
515    /// Some hashes did not correspond to a stored certificate.
516    InvalidHashes(Vec<CryptoHash>),
517}
518
519impl ResultReadCertificates {
520    /// Creating the processed read certificates.
521    pub fn new(
522        certificates: Vec<Option<Arc<ConfirmedBlockCertificate>>>,
523        hashes: Vec<CryptoHash>,
524    ) -> Self {
525        let (certificates, invalid_hashes) = certificates
526            .into_iter()
527            .zip(hashes)
528            .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
529                Some(cert) => itertools::Either::Left(Arc::unwrap_or_clone(cert)),
530                None => itertools::Either::Right(hash),
531            });
532        if invalid_hashes.is_empty() {
533            Self::Certificates(certificates)
534        } else {
535            Self::InvalidHashes(invalid_hashes)
536        }
537    }
538}
539
540/// An implementation of `ExecutionRuntimeContext` suitable for the core protocol.
541#[derive(Clone)]
542pub struct ChainRuntimeContext<S> {
543    storage: S,
544    chain_id: ChainId,
545    thread_pool: StdArc<linera_execution::ThreadPool>,
546    execution_runtime_config: ExecutionRuntimeConfig,
547    user_contracts: StdArc<papaya::HashMap<ApplicationId, UserContractCode>>,
548    user_services: StdArc<papaya::HashMap<ApplicationId, UserServiceCode>>,
549}
550
551#[cfg_attr(not(web), async_trait)]
552#[cfg_attr(web, async_trait(?Send))]
553impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
554    fn chain_id(&self) -> ChainId {
555        self.chain_id
556    }
557
558    fn thread_pool(&self) -> &StdArc<linera_execution::ThreadPool> {
559        &self.thread_pool
560    }
561
562    fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
563        self.execution_runtime_config
564    }
565
566    fn user_contracts(&self) -> &StdArc<papaya::HashMap<ApplicationId, UserContractCode>> {
567        &self.user_contracts
568    }
569
570    fn user_services(&self) -> &StdArc<papaya::HashMap<ApplicationId, UserServiceCode>> {
571        &self.user_services
572    }
573
574    async fn get_user_contract(
575        &self,
576        description: &ApplicationDescription,
577        txn_tracker: &TransactionTracker,
578    ) -> Result<UserContractCode, ExecutionError> {
579        let application_id = description.into();
580        let pinned = self.user_contracts.pin_owned();
581        if let Some(contract) = pinned.get(&application_id) {
582            return Ok(contract.clone());
583        }
584        let contract = self.storage.load_contract(description, txn_tracker).await?;
585        pinned.insert(application_id, contract.clone());
586        Ok(contract)
587    }
588
589    async fn get_user_service(
590        &self,
591        description: &ApplicationDescription,
592        txn_tracker: &TransactionTracker,
593    ) -> Result<UserServiceCode, ExecutionError> {
594        let application_id = description.into();
595        let pinned = self.user_services.pin_owned();
596        if let Some(service) = pinned.get(&application_id) {
597            return Ok(service.clone());
598        }
599        let service = self.storage.load_service(description, txn_tracker).await?;
600        pinned.insert(application_id, service.clone());
601        Ok(service)
602    }
603
604    async fn get_blob(&self, blob_id: BlobId) -> Result<Option<StdArc<Blob>>, ViewError> {
605        Ok(self.storage.read_blob(blob_id).await?.map(Arc::into_std))
606    }
607
608    async fn get_event(&self, event_id: EventId) -> Result<Option<StdArc<Vec<u8>>>, ViewError> {
609        Ok(self.storage.read_event(event_id).await?.map(Arc::into_std))
610    }
611
612    async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
613        self.storage.read_network_description().await
614    }
615
616    async fn get_or_load_committee_by_hash(
617        &self,
618        hash: CryptoHash,
619    ) -> Result<StdArc<Committee>, ExecutionError> {
620        self.storage.get_or_load_committee_by_hash(hash).await
621    }
622
623    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
624        self.storage.contains_blob(blob_id).await
625    }
626
627    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
628        self.storage.contains_event(event_id).await
629    }
630
631    #[cfg(with_testing)]
632    async fn add_blobs(
633        &self,
634        blobs: impl IntoIterator<Item = Blob> + Send,
635    ) -> Result<(), ViewError> {
636        let blobs = Vec::from_iter(blobs);
637        self.storage.write_blobs(&blobs).await
638    }
639
640    #[cfg(with_testing)]
641    async fn add_events(
642        &self,
643        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
644    ) -> Result<(), ViewError> {
645        self.storage.write_events(events).await
646    }
647}
648
649/// A clock that can be used to get the current `Timestamp`.
650#[cfg_attr(not(web), async_trait)]
651#[cfg_attr(web, async_trait(?Send))]
652pub trait Clock {
653    /// Returns the current time.
654    fn current_time(&self) -> Timestamp;
655
656    /// Waits until the given timestamp is reached.
657    async fn sleep_until(&self, timestamp: Timestamp);
658}
659
660#[cfg(test)]
661mod tests {
662    use std::collections::BTreeMap;
663
664    use linera_base::{
665        crypto::{AccountPublicKey, CryptoHash},
666        data_types::{
667            Amount, ApplicationPermissions, Blob, BlockHeight, ChainDescription, ChainOrigin,
668            Epoch, InitialChainConfig, NetworkDescription, Round, Timestamp,
669        },
670        identifiers::{BlobId, BlobType, ChainId, EventId, StreamId},
671        ownership::ChainOwnership,
672    };
673    use linera_chain::{
674        block::{Block, ConfirmedBlock},
675        data_types::{BlockExecutionOutcome, ProposedBlock},
676    };
677    use linera_execution::{BlobOrigin, BlobState};
678    #[cfg(feature = "scylladb")]
679    use linera_views::scylla_db::ScyllaDbDatabase;
680    use linera_views::{memory::MemoryDatabase, ViewError};
681    use test_case::test_case;
682
683    use super::*;
684    use crate::db_storage::DbStorage;
685
686    /// Generic test function to test Storage trait features
687    async fn test_storage_chain_exporter<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
688    where
689        S::Context: Send + Sync,
690    {
691        // Test clock functionality
692        let _current_time = storage.clock().current_time();
693        let test_chain_id = ChainId(CryptoHash::test_hash("test_chain"));
694
695        // Test loading a chain (this creates a chain state view)
696        let _chain_view = storage.load_chain(test_chain_id).await?;
697
698        // Test block exporter context
699        let _block_exporter_context = storage.block_exporter_context(0).await?;
700        Ok(())
701    }
702
703    async fn test_storage_blob<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
704    where
705        S::Context: Send + Sync,
706    {
707        // Create test blobs
708        let chain_description = ChainDescription::new(
709            ChainOrigin::Root(0),
710            InitialChainConfig {
711                ownership: ChainOwnership::single(AccountPublicKey::test_key(0).into()),
712                epoch: Epoch::ZERO,
713                balance: Amount::ZERO,
714                application_permissions: ApplicationPermissions::default(),
715            },
716            Timestamp::from(0),
717        );
718
719        let test_blob1 = Blob::new_chain_description(&chain_description);
720        let test_blob2 = Blob::new_data(vec![10, 20, 30]);
721        let test_blob3 = Blob::new_data(vec![40, 50, 60]);
722
723        // Testing blobs existence
724        let blob_id1 = test_blob1.id();
725        let blob_id2 = test_blob2.id();
726        let blob_id3 = test_blob3.id();
727
728        // Test blob existence before writing
729        assert!(!storage.contains_blob(blob_id1).await?);
730        assert!(!storage.contains_blob(blob_id2).await?);
731        assert!(!storage.contains_blob(blob_id3).await?);
732
733        // Test single blob write
734        storage.write_blob(&test_blob1).await?;
735        assert!(storage.contains_blob(blob_id1).await?);
736
737        // Test multiple blob write (write_blobs)
738        storage
739            .write_blobs(&[test_blob2.clone(), test_blob3.clone()])
740            .await?;
741        assert!(storage.contains_blob(blob_id2).await?);
742        assert!(storage.contains_blob(blob_id3).await?);
743
744        // Test single blob read
745        let read_blob = storage.read_blob(blob_id1).await?;
746        assert_eq!(read_blob.as_deref(), Some(&test_blob1));
747
748        // Test multiple blob read (read_blobs)
749        let blob_ids = vec![blob_id1, blob_id2, blob_id3];
750        let read_blobs = storage.read_blobs(&blob_ids).await?;
751        assert_eq!(read_blobs.len(), 3);
752
753        // Verify each blob was read correctly
754        assert_eq!(read_blobs[0].as_deref(), Some(&test_blob1));
755        assert_eq!(read_blobs[1].as_deref(), Some(&test_blob2));
756        assert_eq!(read_blobs[2].as_deref(), Some(&test_blob3));
757
758        // Test missing blobs detection
759        let missing_blob_id = BlobId::new(CryptoHash::test_hash("missing"), BlobType::Data);
760        let missing_blobs = storage.missing_blobs(&[blob_id1, missing_blob_id]).await?;
761        assert_eq!(missing_blobs, vec![missing_blob_id]);
762
763        // Test maybe_write_blobs (should return false as blobs don't have blob states yet)
764        let write_results = storage
765            .maybe_write_blobs(std::slice::from_ref(&test_blob1))
766            .await?;
767        assert_eq!(write_results, vec![false]);
768
769        // Test blob state operations
770        let blob_state1 = BlobState {
771            origin: BlobOrigin::Published {
772                chain_id: ChainId(CryptoHash::test_hash("chain1")),
773                block_height: BlockHeight(0),
774            },
775            last_used_by: None,
776            epoch: Some(Epoch::ZERO),
777        };
778        let blob_state2 = BlobState {
779            origin: BlobOrigin::Published {
780                chain_id: ChainId(CryptoHash::test_hash("chain2")),
781                block_height: BlockHeight(1),
782            },
783            last_used_by: Some(CryptoHash::test_hash("cert")),
784            epoch: Some(Epoch::from(1)),
785        };
786
787        // Test blob state existence before writing
788        assert!(!storage.contains_blob_state(blob_id1).await?);
789        assert!(!storage.contains_blob_state(blob_id2).await?);
790
791        // Test blob state writing
792        storage
793            .maybe_write_blob_states(&[blob_id1], blob_state1.clone())
794            .await?;
795        storage
796            .maybe_write_blob_states(&[blob_id2], blob_state2.clone())
797            .await?;
798
799        // Test blob state existence after writing
800        assert!(storage.contains_blob_state(blob_id1).await?);
801        assert!(storage.contains_blob_state(blob_id2).await?);
802
803        // Test single blob state read
804        let read_blob_state = storage.read_blob_state(blob_id1).await?;
805        assert_eq!(read_blob_state, Some(blob_state1.clone()));
806
807        // Test multiple blob state read (read_blob_states)
808        let read_blob_states = storage.read_blob_states(&[blob_id1, blob_id2]).await?;
809        assert_eq!(read_blob_states.len(), 2);
810
811        // Verify blob states
812        assert_eq!(read_blob_states[0], Some(blob_state1));
813        assert_eq!(read_blob_states[1], Some(blob_state2));
814
815        // Test maybe_write_blobs now that blob states exist (should return true)
816        let write_results = storage
817            .maybe_write_blobs(std::slice::from_ref(&test_blob1))
818            .await?;
819        assert_eq!(write_results, vec![true]);
820
821        Ok(())
822    }
823
824    async fn test_storage_certificate<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
825    where
826        S::Context: Send + Sync,
827    {
828        let cert_hash = CryptoHash::test_hash("certificate");
829
830        // Test certificate existence (should be false initially)
831        assert!(!storage.contains_certificate(cert_hash).await?);
832
833        // Test reading non-existent certificate
834        assert!(storage.read_certificate(cert_hash).await?.is_none());
835
836        // Test reading multiple certificates
837        let cert_hashes = vec![cert_hash, CryptoHash::test_hash("cert2")];
838        let certs_result = storage.read_certificates(&cert_hashes).await?;
839        assert_eq!(certs_result.len(), 2);
840        assert!(certs_result[0].is_none());
841        assert!(certs_result[1].is_none());
842
843        // Test raw certificate reading
844        let raw_certs_result = storage.read_certificates_raw(&cert_hashes).await?;
845        assert!(raw_certs_result.iter().all(|cert| cert.is_none())); // No certificates exist
846
847        // Test confirmed block reading
848        let block_hash = CryptoHash::test_hash("block");
849        let block_result = storage.read_confirmed_block(block_hash).await?;
850        assert!(block_result.is_none());
851
852        // Test write_blobs_and_certificate functionality
853        // Create test blobs
854        let test_blob1 = Blob::new_data(vec![1, 2, 3]);
855        let test_blob2 = Blob::new_data(vec![4, 5, 6]);
856        let blobs = vec![test_blob1, test_blob2];
857
858        // Create a test certificate using the working pattern from linera-indexer tests
859        let chain_id = ChainId(CryptoHash::test_hash("test_chain_cert"));
860
861        // Create a minimal proposed block (genesis block)
862        let proposed_block = ProposedBlock {
863            epoch: Epoch::ZERO,
864            chain_id,
865            transactions: vec![],
866            previous_block_hash: None,
867            height: BlockHeight::ZERO,
868            authenticated_owner: None,
869            timestamp: Timestamp::default(),
870        };
871
872        // Create a minimal block execution outcome with proper BTreeMap types
873        let outcome = BlockExecutionOutcome {
874            messages: vec![],
875            state_hash: CryptoHash::default(),
876            oracle_responses: vec![],
877            events: vec![],
878            blobs: vec![],
879            operation_results: vec![],
880            previous_event_blocks: BTreeMap::new(),
881            previous_message_blocks: BTreeMap::new(),
882        };
883
884        let block = Block::new(proposed_block, outcome);
885        let confirmed_block = ConfirmedBlock::new(block);
886        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
887
888        // Test writing blobs and certificate together
889        storage
890            .write_blobs_and_certificate(&blobs, &certificate)
891            .await?;
892
893        // Verify the certificate was written
894        let cert_hash = certificate.hash();
895        assert!(storage.contains_certificate(cert_hash).await?);
896
897        // Verify the certificate can be read back
898        let read_certificate = storage.read_certificate(cert_hash).await?;
899        assert!(read_certificate.is_some());
900        assert_eq!(read_certificate.unwrap().hash(), cert_hash);
901
902        // Verify the blobs were written
903        for blob in &blobs {
904            assert!(storage.contains_blob(blob.id()).await?);
905        }
906
907        Ok(())
908    }
909
910    async fn test_storage_event<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
911    where
912        S::Context: Send + Sync,
913    {
914        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
915        let stream_id = StreamId::system("test_stream");
916
917        // Test multiple events
918        let event_id1 = EventId {
919            chain_id,
920            stream_id: stream_id.clone(),
921            index: 0,
922        };
923        let event_id2 = EventId {
924            chain_id,
925            stream_id: stream_id.clone(),
926            index: 1,
927        };
928        let event_id3 = EventId {
929            chain_id,
930            stream_id: stream_id.clone(),
931            index: 2,
932        };
933
934        let event_data1 = vec![1, 2, 3];
935        let event_data2 = vec![4, 5, 6];
936        let event_data3 = vec![7, 8, 9];
937
938        // Test event existence before writing
939        assert!(!storage.contains_event(event_id1.clone()).await?);
940        assert!(!storage.contains_event(event_id2.clone()).await?);
941
942        // Write multiple events
943        storage
944            .write_events([
945                (event_id1.clone(), event_data1.clone()),
946                (event_id2.clone(), event_data2.clone()),
947                (event_id3.clone(), event_data3.clone()),
948            ])
949            .await?;
950
951        // Test event existence after writing
952        assert!(storage.contains_event(event_id1.clone()).await?);
953        assert!(storage.contains_event(event_id2.clone()).await?);
954        assert!(storage.contains_event(event_id3.clone()).await?);
955
956        // Test individual event reading
957        let read_event1 = storage.read_event(event_id1).await?;
958        assert_eq!(read_event1.as_deref(), Some(&event_data1));
959
960        let read_event2 = storage.read_event(event_id2).await?;
961        assert_eq!(read_event2.as_deref(), Some(&event_data2));
962
963        // Test reading events from index
964        let events_from_index = storage
965            .read_events_from_index(&chain_id, &stream_id, 1)
966            .await?;
967        assert!(events_from_index.len() >= 2); // Should contain events at index 1 and 2
968        Ok(())
969    }
970
971    async fn test_storage_network_description<S: Storage + Sync>(
972        storage: &S,
973    ) -> Result<(), ViewError>
974    where
975        S::Context: Send + Sync,
976    {
977        let admin_chain_id = ChainId(CryptoHash::test_hash("test_chain_second"));
978
979        let network_desc = NetworkDescription {
980            name: "test_network".to_string(),
981            genesis_config_hash: CryptoHash::test_hash("genesis_config"),
982            genesis_timestamp: Timestamp::from(0),
983            genesis_committee_blob_hash: CryptoHash::test_hash("committee"),
984            admin_chain_id,
985        };
986
987        // Test reading non-existent network description
988        assert!(storage.read_network_description().await?.is_none());
989
990        // Write network description
991        storage.write_network_description(&network_desc).await?;
992
993        // Test reading existing network description
994        let read_desc = storage.read_network_description().await?;
995        assert_eq!(read_desc, Some(network_desc));
996
997        Ok(())
998    }
999
1000    /// Generic test function to test Storage trait features
1001    #[test_case(DbStorage::<MemoryDatabase, _>::make_test_storage(None).await; "memory")]
1002    #[cfg_attr(feature = "scylladb", test_case(DbStorage::<ScyllaDbDatabase, _>::make_test_storage(None).await; "scylla_db"))]
1003    #[test_log::test(tokio::test)]
1004    async fn test_storage_features<S: Storage + Sync>(storage: S) -> Result<(), ViewError>
1005    where
1006        S::Context: Send + Sync,
1007    {
1008        test_storage_chain_exporter(&storage).await?;
1009        test_storage_blob(&storage).await?;
1010        test_storage_certificate(&storage).await?;
1011        test_storage_event(&storage).await?;
1012        test_storage_network_description(&storage).await?;
1013        Ok(())
1014    }
1015}