linera_storage/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the storage abstractions for individual chains and certificates.
5
6mod db_storage;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use itertools::Itertools;
12use linera_base::{
13    crypto::CryptoHash,
14    data_types::{
15        ApplicationDescription, Blob, ChainDescription, CompressedBytecode, NetworkDescription,
16        TimeDelta, Timestamp,
17    },
18    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
19    vm::VmRuntime,
20};
21use linera_chain::{
22    types::{ConfirmedBlock, ConfirmedBlockCertificate},
23    ChainError, ChainStateView,
24};
25#[cfg(with_revm)]
26use linera_execution::{
27    evm::revm::{EvmContractModule, EvmServiceModule},
28    EvmRuntime,
29};
30use linera_execution::{
31    BlobState, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, TransactionTracker,
32    UserContractCode, UserServiceCode, WasmRuntime,
33};
34#[cfg(with_wasm_runtime)]
35use linera_execution::{WasmContractModule, WasmServiceModule};
36use linera_views::{context::Context, views::RootView, ViewError};
37
38#[cfg(with_metrics)]
39pub use crate::db_storage::metrics;
40#[cfg(with_testing)]
41pub use crate::db_storage::TestClock;
42pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
43
44/// The default namespace to be used when none is specified
45pub const DEFAULT_NAMESPACE: &str = "default";
46
47/// Communicate with a persistent storage using the "views" abstraction.
48#[cfg_attr(not(web), async_trait)]
49#[cfg_attr(web, async_trait(?Send))]
50pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
51    /// The low-level storage implementation in use by the core protocol (chain workers etc).
52    type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
53
54    /// The clock type being used.
55    type Clock: Clock;
56
57    /// The low-level storage implementation in use by the block exporter.
58    type BlockExporterContext: Context<Extra = u32> + Clone;
59
60    /// Returns the current wall clock time.
61    fn clock(&self) -> &Self::Clock;
62
63    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool>;
64
65    /// Loads the view of a chain state.
66    ///
67    /// # Notes
68    ///
69    /// Each time this method is called, a new [`ChainStateView`] is created. If there are multiple
70    /// instances of the same chain active at any given moment, they will race to access persistent
71    /// storage. This can lead to invalid states and data corruption.
72    async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
73
74    /// Tests the existence of a blob with the given blob ID.
75    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
76
77    /// Returns what blobs from the input are missing from storage.
78    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
79
80    /// Tests existence of a blob state with the given blob ID.
81    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
82
83    /// Reads the hashed certificate value with the given hash.
84    async fn read_confirmed_block(
85        &self,
86        hash: CryptoHash,
87    ) -> Result<Option<ConfirmedBlock>, ViewError>;
88
89    /// Reads the blob with the given blob ID.
90    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
91
92    /// Reads the blobs with the given blob IDs.
93    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
94
95    /// Reads the blob state with the given blob ID.
96    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
97
98    /// Reads the blob states with the given blob IDs.
99    async fn read_blob_states(
100        &self,
101        blob_ids: &[BlobId],
102    ) -> Result<Vec<Option<BlobState>>, ViewError>;
103
104    /// Writes the given blob.
105    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
106
107    /// Writes blobs and certificate
108    async fn write_blobs_and_certificate(
109        &self,
110        blobs: &[Blob],
111        certificate: &ConfirmedBlockCertificate,
112    ) -> Result<(), ViewError>;
113
114    /// Writes the given blobs, but only if they already have a blob state. Returns `true` for the
115    /// blobs that were written.
116    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
117
118    /// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
119    async fn maybe_write_blob_states(
120        &self,
121        blob_ids: &[BlobId],
122        blob_state: BlobState,
123    ) -> Result<(), ViewError>;
124
125    /// Writes several blobs.
126    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
127
128    /// Tests existence of the certificate with the given hash.
129    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
130
131    /// Reads the certificate with the given hash.
132    async fn read_certificate(
133        &self,
134        hash: CryptoHash,
135    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
136
137    /// Reads a number of certificates
138    async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
139        &self,
140        hashes: I,
141    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
142
143    /// Reads certificates by hashes.
144    ///
145    /// Returns a vector of tuples where the first element is a lite certificate
146    /// and the second element is confirmed block.
147    ///
148    /// It does not check if all hashes all returned.
149    async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
150        &self,
151        hashes: I,
152    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError>;
153
154    /// Reads the event with the given ID.
155    async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
156
157    /// Tests existence of the event with the given ID.
158    async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
159
160    /// Lists all the events from a starting index
161    async fn read_events_from_index(
162        &self,
163        chain_id: &ChainId,
164        stream_id: &StreamId,
165        start_index: u32,
166    ) -> Result<Vec<IndexAndEvent>, ViewError>;
167
168    /// Writes a vector of events.
169    async fn write_events(
170        &self,
171        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
172    ) -> Result<(), ViewError>;
173
174    /// Reads the network description.
175    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
176
177    /// Writes the network description.
178    async fn write_network_description(
179        &self,
180        information: &NetworkDescription,
181    ) -> Result<(), ViewError>;
182
183    /// Initializes a chain in a simple way (used for testing and to create a genesis state).
184    ///
185    /// # Notes
186    ///
187    /// This method creates a new [`ChainStateView`] instance. If there are multiple instances of
188    /// the same chain active at any given moment, they will race to access persistent storage.
189    /// This can lead to invalid states and data corruption.
190    async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
191    where
192        ChainRuntimeContext<Self>: ExecutionRuntimeContext,
193    {
194        let id = description.id();
195        // Store the description blob.
196        self.write_blob(&Blob::new_chain_description(&description))
197            .await?;
198        let mut chain = self.load_chain(id).await?;
199        assert!(!chain.is_active(), "Attempting to create a chain twice");
200        let current_time = self.clock().current_time();
201        chain.initialize_if_needed(current_time).await?;
202        chain.save().await?;
203        Ok(())
204    }
205
206    /// Selects the WebAssembly runtime to use for applications (if any).
207    fn wasm_runtime(&self) -> Option<WasmRuntime>;
208
209    /// Creates a [`UserContractCode`] instance using the bytecode in storage referenced
210    /// by the `application_description`.
211    async fn load_contract(
212        &self,
213        application_description: &ApplicationDescription,
214        txn_tracker: &TransactionTracker,
215    ) -> Result<UserContractCode, ExecutionError> {
216        let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
217        let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
218            Some(content) => content.clone(),
219            None => self
220                .read_blob(contract_bytecode_blob_id)
221                .await?
222                .ok_or(ExecutionError::BlobsNotFound(vec![
223                    contract_bytecode_blob_id,
224                ]))?
225                .into_content(),
226        };
227        let compressed_contract_bytecode = CompressedBytecode {
228            compressed_bytes: content.into_arc_bytes(),
229        };
230        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
231        let contract_bytecode = self
232            .thread_pool()
233            .run_send((), move |()| async move {
234                compressed_contract_bytecode.decompress()
235            })
236            .await
237            .await??;
238        match application_description.module_id.vm_runtime {
239            VmRuntime::Wasm => {
240                cfg_if::cfg_if! {
241                    if #[cfg(with_wasm_runtime)] {
242                        let Some(wasm_runtime) = self.wasm_runtime() else {
243                            panic!("A Wasm runtime is required to load user applications.");
244                        };
245                        Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
246                           .await?
247                           .into())
248                    } else {
249                        panic!(
250                            "A Wasm runtime is required to load user applications. \
251                             Please enable the `wasmer` or the `wasmtime` feature flags \
252                             when compiling `linera-storage`."
253                        );
254                    }
255                }
256            }
257            VmRuntime::Evm => {
258                cfg_if::cfg_if! {
259                    if #[cfg(with_revm)] {
260                        let evm_runtime = EvmRuntime::Revm;
261                        Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
262                           .into())
263                    } else {
264                        panic!(
265                            "An Evm runtime is required to load user applications. \
266                             Please enable the `revm` feature flag \
267                             when compiling `linera-storage`."
268                        );
269                    }
270                }
271            }
272        }
273    }
274
275    /// Creates a [`linera-sdk::UserContract`] instance using the bytecode in storage referenced
276    /// by the `application_description`.
277    async fn load_service(
278        &self,
279        application_description: &ApplicationDescription,
280        txn_tracker: &TransactionTracker,
281    ) -> Result<UserServiceCode, ExecutionError> {
282        let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
283        let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
284            Some(content) => content.clone(),
285            None => self
286                .read_blob(service_bytecode_blob_id)
287                .await?
288                .ok_or(ExecutionError::BlobsNotFound(vec![
289                    service_bytecode_blob_id,
290                ]))?
291                .into_content(),
292        };
293        let compressed_service_bytecode = CompressedBytecode {
294            compressed_bytes: content.into_arc_bytes(),
295        };
296        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
297        let service_bytecode = self
298            .thread_pool()
299            .run_send((), move |()| async move {
300                compressed_service_bytecode.decompress()
301            })
302            .await
303            .await??;
304        match application_description.module_id.vm_runtime {
305            VmRuntime::Wasm => {
306                cfg_if::cfg_if! {
307                    if #[cfg(with_wasm_runtime)] {
308                        let Some(wasm_runtime) = self.wasm_runtime() else {
309                            panic!("A Wasm runtime is required to load user applications.");
310                        };
311                        Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
312                           .await?
313                           .into())
314                    } else {
315                        panic!(
316                            "A Wasm runtime is required to load user applications. \
317                             Please enable the `wasmer` or the `wasmtime` feature flags \
318                             when compiling `linera-storage`."
319                        );
320                    }
321                }
322            }
323            VmRuntime::Evm => {
324                cfg_if::cfg_if! {
325                    if #[cfg(with_revm)] {
326                        let evm_runtime = EvmRuntime::Revm;
327                        Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
328                           .into())
329                    } else {
330                        panic!(
331                            "An Evm runtime is required to load user applications. \
332                             Please enable the `revm` feature flag \
333                             when compiling `linera-storage`."
334                        );
335                    }
336                }
337            }
338        }
339    }
340
341    async fn block_exporter_context(
342        &self,
343        block_exporter_id: u32,
344    ) -> Result<Self::BlockExporterContext, ViewError>;
345
346    /// Lists the blob IDs in storage.
347    async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError>;
348
349    /// Lists the chain IDs in storage.
350    async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError>;
351
352    /// Lists the event IDs in storage.
353    async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError>;
354}
355
356/// The result of processing the obtained read certificates.
357pub enum ResultReadCertificates {
358    Certificates(Vec<ConfirmedBlockCertificate>),
359    InvalidHashes(Vec<CryptoHash>),
360}
361
362impl ResultReadCertificates {
363    /// Creating the processed read certificates.
364    pub fn new(
365        certificates: Vec<Option<ConfirmedBlockCertificate>>,
366        hashes: Vec<CryptoHash>,
367    ) -> Self {
368        let (certificates, invalid_hashes) = certificates
369            .into_iter()
370            .zip(hashes)
371            .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
372                Some(cert) => itertools::Either::Left(cert),
373                None => itertools::Either::Right(hash),
374            });
375        if invalid_hashes.is_empty() {
376            Self::Certificates(certificates)
377        } else {
378            Self::InvalidHashes(invalid_hashes)
379        }
380    }
381}
382
383/// An implementation of `ExecutionRuntimeContext` suitable for the core protocol.
384#[derive(Clone)]
385pub struct ChainRuntimeContext<S> {
386    storage: S,
387    chain_id: ChainId,
388    thread_pool: Arc<linera_execution::ThreadPool>,
389    execution_runtime_config: ExecutionRuntimeConfig,
390    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
391    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
392}
393
394#[cfg_attr(not(web), async_trait)]
395#[cfg_attr(web, async_trait(?Send))]
396impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
397    fn chain_id(&self) -> ChainId {
398        self.chain_id
399    }
400
401    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
402        &self.thread_pool
403    }
404
405    fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
406        self.execution_runtime_config
407    }
408
409    fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
410        &self.user_contracts
411    }
412
413    fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
414        &self.user_services
415    }
416
417    async fn get_user_contract(
418        &self,
419        description: &ApplicationDescription,
420        txn_tracker: &TransactionTracker,
421    ) -> Result<UserContractCode, ExecutionError> {
422        let application_id = description.into();
423        let pinned = self.user_contracts.pin_owned();
424        if let Some(contract) = pinned.get(&application_id) {
425            return Ok(contract.clone());
426        }
427        let contract = self.storage.load_contract(description, txn_tracker).await?;
428        pinned.insert(application_id, contract.clone());
429        Ok(contract)
430    }
431
432    async fn get_user_service(
433        &self,
434        description: &ApplicationDescription,
435        txn_tracker: &TransactionTracker,
436    ) -> Result<UserServiceCode, ExecutionError> {
437        let application_id = description.into();
438        let pinned = self.user_services.pin_owned();
439        if let Some(service) = pinned.get(&application_id) {
440            return Ok(service.clone());
441        }
442        let service = self.storage.load_service(description, txn_tracker).await?;
443        pinned.insert(application_id, service.clone());
444        Ok(service)
445    }
446
447    async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
448        self.storage.read_blob(blob_id).await
449    }
450
451    async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
452        self.storage.read_event(event_id).await
453    }
454
455    async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
456        self.storage.read_network_description().await
457    }
458
459    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
460        self.storage.contains_blob(blob_id).await
461    }
462
463    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
464        self.storage.contains_event(event_id).await
465    }
466
467    #[cfg(with_testing)]
468    async fn add_blobs(
469        &self,
470        blobs: impl IntoIterator<Item = Blob> + Send,
471    ) -> Result<(), ViewError> {
472        let blobs = Vec::from_iter(blobs);
473        self.storage.write_blobs(&blobs).await
474    }
475
476    #[cfg(with_testing)]
477    async fn add_events(
478        &self,
479        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
480    ) -> Result<(), ViewError> {
481        self.storage.write_events(events).await
482    }
483}
484
485/// A clock that can be used to get the current `Timestamp`.
486#[cfg_attr(not(web), async_trait)]
487#[cfg_attr(web, async_trait(?Send))]
488pub trait Clock {
489    fn current_time(&self) -> Timestamp;
490
491    async fn sleep(&self, delta: TimeDelta);
492
493    async fn sleep_until(&self, timestamp: Timestamp);
494}
495
496#[cfg(test)]
497mod tests {
498    use std::collections::BTreeMap;
499
500    use linera_base::{
501        crypto::{AccountPublicKey, CryptoHash},
502        data_types::{
503            Amount, ApplicationPermissions, Blob, BlockHeight, ChainDescription, ChainOrigin,
504            Epoch, InitialChainConfig, NetworkDescription, Round, Timestamp,
505        },
506        identifiers::{BlobId, BlobType, ChainId, EventId, StreamId},
507        ownership::ChainOwnership,
508    };
509    use linera_chain::{
510        block::{Block, ConfirmedBlock},
511        data_types::{BlockExecutionOutcome, ProposedBlock},
512    };
513    use linera_execution::BlobState;
514    #[cfg(feature = "dynamodb")]
515    use linera_views::dynamo_db::DynamoDbDatabase;
516    #[cfg(feature = "scylladb")]
517    use linera_views::scylla_db::ScyllaDbDatabase;
518    use linera_views::{memory::MemoryDatabase, ViewError};
519    use test_case::test_case;
520
521    use super::*;
522    use crate::db_storage::DbStorage;
523
524    /// Generic test function to test Storage trait features
525    async fn test_storage_chain_exporter<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
526    where
527        S::Context: Send + Sync,
528    {
529        // Test clock functionality
530        let _current_time = storage.clock().current_time();
531        let test_chain_id = ChainId(CryptoHash::test_hash("test_chain"));
532
533        // Test loading a chain (this creates a chain state view)
534        let _chain_view = storage.load_chain(test_chain_id).await?;
535
536        // Test block exporter context
537        let _block_exporter_context = storage.block_exporter_context(0).await?;
538        Ok(())
539    }
540
541    async fn test_storage_blob<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
542    where
543        S::Context: Send + Sync,
544    {
545        // Create test blobs
546        let chain_description = ChainDescription::new(
547            ChainOrigin::Root(0),
548            InitialChainConfig {
549                ownership: ChainOwnership::single(AccountPublicKey::test_key(0).into()),
550                epoch: Epoch::ZERO,
551                min_active_epoch: Epoch::ZERO,
552                max_active_epoch: Epoch::ZERO,
553                balance: Amount::ZERO,
554                application_permissions: ApplicationPermissions::default(),
555            },
556            Timestamp::from(0),
557        );
558
559        let test_blob1 = Blob::new_chain_description(&chain_description);
560        let test_blob2 = Blob::new_data(vec![10, 20, 30]);
561        let test_blob3 = Blob::new_data(vec![40, 50, 60]);
562
563        // Testing blobs existence
564        let blob_id1 = test_blob1.id();
565        let blob_id2 = test_blob2.id();
566        let blob_id3 = test_blob3.id();
567
568        // Test blob existence before writing
569        assert!(!storage.contains_blob(blob_id1).await?);
570        assert!(!storage.contains_blob(blob_id2).await?);
571        assert!(!storage.contains_blob(blob_id3).await?);
572
573        // Test single blob write
574        storage.write_blob(&test_blob1).await?;
575        assert!(storage.contains_blob(blob_id1).await?);
576
577        // Test multiple blob write (write_blobs)
578        storage
579            .write_blobs(&[test_blob2.clone(), test_blob3.clone()])
580            .await?;
581        assert!(storage.contains_blob(blob_id2).await?);
582        assert!(storage.contains_blob(blob_id3).await?);
583
584        // Test single blob read
585        let read_blob = storage.read_blob(blob_id1).await?;
586        assert_eq!(read_blob, Some(test_blob1.clone()));
587
588        // Test multiple blob read (read_blobs)
589        let blob_ids = vec![blob_id1, blob_id2, blob_id3];
590        let read_blobs = storage.read_blobs(&blob_ids).await?;
591        assert_eq!(read_blobs.len(), 3);
592
593        // Verify each blob was read correctly
594        assert_eq!(read_blobs[0], Some(test_blob1.clone()));
595        assert_eq!(read_blobs[1], Some(test_blob2));
596        assert_eq!(read_blobs[2], Some(test_blob3));
597
598        // Test missing blobs detection
599        let missing_blob_id = BlobId::new(CryptoHash::test_hash("missing"), BlobType::Data);
600        let missing_blobs = storage.missing_blobs(&[blob_id1, missing_blob_id]).await?;
601        assert_eq!(missing_blobs, vec![missing_blob_id]);
602
603        // Test maybe_write_blobs (should return false as blobs don't have blob states yet)
604        let write_results = storage.maybe_write_blobs(&[test_blob1.clone()]).await?;
605        assert_eq!(write_results, vec![false]);
606
607        // Test blob state operations
608        let blob_state1 = BlobState {
609            last_used_by: None,
610            chain_id: ChainId(CryptoHash::test_hash("chain1")),
611            block_height: BlockHeight(0),
612            epoch: Some(Epoch::ZERO),
613        };
614        let blob_state2 = BlobState {
615            last_used_by: Some(CryptoHash::test_hash("cert")),
616            chain_id: ChainId(CryptoHash::test_hash("chain2")),
617            block_height: BlockHeight(1),
618            epoch: Some(Epoch::from(1)),
619        };
620
621        // Test blob state existence before writing
622        assert!(!storage.contains_blob_state(blob_id1).await?);
623        assert!(!storage.contains_blob_state(blob_id2).await?);
624
625        // Test blob state writing
626        storage
627            .maybe_write_blob_states(&[blob_id1], blob_state1.clone())
628            .await?;
629        storage
630            .maybe_write_blob_states(&[blob_id2], blob_state2.clone())
631            .await?;
632
633        // Test blob state existence after writing
634        assert!(storage.contains_blob_state(blob_id1).await?);
635        assert!(storage.contains_blob_state(blob_id2).await?);
636
637        // Test single blob state read
638        let read_blob_state = storage.read_blob_state(blob_id1).await?;
639        assert_eq!(read_blob_state, Some(blob_state1.clone()));
640
641        // Test multiple blob state read (read_blob_states)
642        let read_blob_states = storage.read_blob_states(&[blob_id1, blob_id2]).await?;
643        assert_eq!(read_blob_states.len(), 2);
644
645        // Verify blob states
646        assert_eq!(read_blob_states[0], Some(blob_state1));
647        assert_eq!(read_blob_states[1], Some(blob_state2));
648
649        // Test maybe_write_blobs now that blob states exist (should return true)
650        let write_results = storage.maybe_write_blobs(&[test_blob1.clone()]).await?;
651        assert_eq!(write_results, vec![true]);
652
653        Ok(())
654    }
655
656    async fn test_storage_certificate<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
657    where
658        S::Context: Send + Sync,
659    {
660        let cert_hash = CryptoHash::test_hash("certificate");
661
662        // Test certificate existence (should be false initially)
663        assert!(!storage.contains_certificate(cert_hash).await?);
664
665        // Test reading non-existent certificate
666        assert!(storage.read_certificate(cert_hash).await?.is_none());
667
668        // Test reading multiple certificates
669        let cert_hashes = vec![cert_hash, CryptoHash::test_hash("cert2")];
670        let certs_result = storage.read_certificates(cert_hashes.clone()).await?;
671        assert_eq!(certs_result.len(), 2);
672        assert!(certs_result[0].is_none());
673        assert!(certs_result[1].is_none());
674
675        // Test raw certificate reading
676        let raw_certs_result = storage.read_certificates_raw(cert_hashes).await?;
677        assert!(raw_certs_result.is_empty()); // No certificates exist
678
679        // Test confirmed block reading
680        let block_hash = CryptoHash::test_hash("block");
681        let block_result = storage.read_confirmed_block(block_hash).await?;
682        assert!(block_result.is_none());
683
684        // Test write_blobs_and_certificate functionality
685        // Create test blobs
686        let test_blob1 = Blob::new_data(vec![1, 2, 3]);
687        let test_blob2 = Blob::new_data(vec![4, 5, 6]);
688        let blobs = vec![test_blob1, test_blob2];
689
690        // Create a test certificate using the working pattern from linera-indexer tests
691        let chain_id = ChainId(CryptoHash::test_hash("test_chain_cert"));
692
693        // Create a minimal proposed block (genesis block)
694        let proposed_block = ProposedBlock {
695            epoch: Epoch::ZERO,
696            chain_id,
697            transactions: vec![],
698            previous_block_hash: None,
699            height: BlockHeight::ZERO,
700            authenticated_owner: None,
701            timestamp: Timestamp::default(),
702        };
703
704        // Create a minimal block execution outcome with proper BTreeMap types
705        let outcome = BlockExecutionOutcome {
706            messages: vec![],
707            state_hash: CryptoHash::default(),
708            oracle_responses: vec![],
709            events: vec![],
710            blobs: vec![],
711            operation_results: vec![],
712            previous_event_blocks: BTreeMap::new(),
713            previous_message_blocks: BTreeMap::new(),
714        };
715
716        let block = Block::new(proposed_block, outcome);
717        let confirmed_block = ConfirmedBlock::new(block);
718        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
719
720        // Test writing blobs and certificate together
721        storage
722            .write_blobs_and_certificate(&blobs, &certificate)
723            .await?;
724
725        // Verify the certificate was written
726        let cert_hash = certificate.hash();
727        assert!(storage.contains_certificate(cert_hash).await?);
728
729        // Verify the certificate can be read back
730        let read_certificate = storage.read_certificate(cert_hash).await?;
731        assert!(read_certificate.is_some());
732        assert_eq!(read_certificate.unwrap().hash(), cert_hash);
733
734        // Verify the blobs were written
735        for blob in &blobs {
736            assert!(storage.contains_blob(blob.id()).await?);
737        }
738
739        Ok(())
740    }
741
742    async fn test_storage_event<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
743    where
744        S::Context: Send + Sync,
745    {
746        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
747        let stream_id = StreamId::system("test_stream");
748
749        // Test multiple events
750        let event_id1 = EventId {
751            chain_id,
752            stream_id: stream_id.clone(),
753            index: 0,
754        };
755        let event_id2 = EventId {
756            chain_id,
757            stream_id: stream_id.clone(),
758            index: 1,
759        };
760        let event_id3 = EventId {
761            chain_id,
762            stream_id: stream_id.clone(),
763            index: 2,
764        };
765
766        let event_data1 = vec![1, 2, 3];
767        let event_data2 = vec![4, 5, 6];
768        let event_data3 = vec![7, 8, 9];
769
770        // Test event existence before writing
771        assert!(!storage.contains_event(event_id1.clone()).await?);
772        assert!(!storage.contains_event(event_id2.clone()).await?);
773
774        // Write multiple events
775        storage
776            .write_events([
777                (event_id1.clone(), event_data1.clone()),
778                (event_id2.clone(), event_data2.clone()),
779                (event_id3.clone(), event_data3.clone()),
780            ])
781            .await?;
782
783        // Test event existence after writing
784        assert!(storage.contains_event(event_id1.clone()).await?);
785        assert!(storage.contains_event(event_id2.clone()).await?);
786        assert!(storage.contains_event(event_id3.clone()).await?);
787
788        // Test individual event reading
789        let read_event1 = storage.read_event(event_id1).await?;
790        assert_eq!(read_event1, Some(event_data1));
791
792        let read_event2 = storage.read_event(event_id2).await?;
793        assert_eq!(read_event2, Some(event_data2));
794
795        // Test reading events from index
796        let events_from_index = storage
797            .read_events_from_index(&chain_id, &stream_id, 1)
798            .await?;
799        assert!(events_from_index.len() >= 2); // Should contain events at index 1 and 2
800        Ok(())
801    }
802
803    async fn test_storage_network_description<S: Storage + Sync>(
804        storage: &S,
805    ) -> Result<(), ViewError>
806    where
807        S::Context: Send + Sync,
808    {
809        let admin_chain_id = ChainId(CryptoHash::test_hash("test_chain_second"));
810
811        let network_desc = NetworkDescription {
812            name: "test_network".to_string(),
813            genesis_config_hash: CryptoHash::test_hash("genesis_config"),
814            genesis_timestamp: Timestamp::from(0),
815            genesis_committee_blob_hash: CryptoHash::test_hash("committee"),
816            admin_chain_id,
817        };
818
819        // Test reading non-existent network description
820        assert!(storage.read_network_description().await?.is_none());
821
822        // Write network description
823        storage.write_network_description(&network_desc).await?;
824
825        // Test reading existing network description
826        let read_desc = storage.read_network_description().await?;
827        assert_eq!(read_desc, Some(network_desc));
828
829        Ok(())
830    }
831
832    /// Generic test function to test Storage trait features
833    #[test_case(DbStorage::<MemoryDatabase, _>::make_test_storage(None).await; "memory")]
834    #[cfg_attr(feature = "dynamodb", test_case(DbStorage::<DynamoDbDatabase, _>::make_test_storage(None).await; "dynamo_db"))]
835    #[cfg_attr(feature = "scylladb", test_case(DbStorage::<ScyllaDbDatabase, _>::make_test_storage(None).await; "scylla_db"))]
836    #[test_log::test(tokio::test)]
837    async fn test_storage_features<S: Storage + Sync>(storage: S) -> Result<(), ViewError>
838    where
839        S::Context: Send + Sync,
840    {
841        test_storage_chain_exporter(&storage).await?;
842        test_storage_blob(&storage).await?;
843        test_storage_certificate(&storage).await?;
844        test_storage_event(&storage).await?;
845        test_storage_network_description(&storage).await?;
846        Ok(())
847    }
848}