linera_storage/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the storage abstractions for individual chains and certificates.
5
6mod db_storage;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use itertools::Itertools;
12use linera_base::{
13    crypto::CryptoHash,
14    data_types::{
15        ApplicationDescription, Blob, BlockHeight, ChainDescription, CompressedBytecode,
16        NetworkDescription, TimeDelta, Timestamp,
17    },
18    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
19    vm::VmRuntime,
20};
21use linera_chain::{
22    types::{ConfirmedBlock, ConfirmedBlockCertificate},
23    ChainError, ChainStateView,
24};
25#[cfg(with_revm)]
26use linera_execution::{
27    evm::revm::{EvmContractModule, EvmServiceModule},
28    EvmRuntime,
29};
30use linera_execution::{
31    BlobState, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, TransactionTracker,
32    UserContractCode, UserServiceCode, WasmRuntime,
33};
34#[cfg(with_wasm_runtime)]
35use linera_execution::{WasmContractModule, WasmServiceModule};
36use linera_views::{context::Context, views::RootView, ViewError};
37
38#[cfg(with_metrics)]
39pub use crate::db_storage::metrics;
40#[cfg(with_testing)]
41pub use crate::db_storage::TestClock;
42pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
43
44/// The default namespace to be used when none is specified
45pub const DEFAULT_NAMESPACE: &str = "default";
46
47/// Communicate with a persistent storage using the "views" abstraction.
48#[cfg_attr(not(web), async_trait)]
49#[cfg_attr(web, async_trait(?Send))]
50pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
51    /// The low-level storage implementation in use by the core protocol (chain workers etc).
52    type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
53
54    /// The clock type being used.
55    type Clock: Clock + Clone + Send + Sync;
56
57    /// The low-level storage implementation in use by the block exporter.
58    type BlockExporterContext: Context<Extra = u32> + Clone;
59
60    /// Returns the current wall clock time.
61    fn clock(&self) -> &Self::Clock;
62
63    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool>;
64
65    /// Loads the view of a chain state.
66    ///
67    /// # Notes
68    ///
69    /// Each time this method is called, a new [`ChainStateView`] is created. If there are multiple
70    /// instances of the same chain active at any given moment, they will race to access persistent
71    /// storage. This can lead to invalid states and data corruption.
72    async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
73
74    /// Tests the existence of a blob with the given blob ID.
75    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
76
77    /// Returns what blobs from the input are missing from storage.
78    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
79
80    /// Tests existence of a blob state with the given blob ID.
81    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
82
83    /// Reads the hashed certificate value with the given hash.
84    async fn read_confirmed_block(
85        &self,
86        hash: CryptoHash,
87    ) -> Result<Option<ConfirmedBlock>, ViewError>;
88
89    /// Reads a number of confirmed blocks by their hashes.
90    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
91        &self,
92        hashes: I,
93    ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError>;
94
95    /// Reads the blob with the given blob ID.
96    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
97
98    /// Reads the blobs with the given blob IDs.
99    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
100
101    /// Reads the blob state with the given blob ID.
102    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
103
104    /// Reads the blob states with the given blob IDs.
105    async fn read_blob_states(
106        &self,
107        blob_ids: &[BlobId],
108    ) -> Result<Vec<Option<BlobState>>, ViewError>;
109
110    /// Writes the given blob.
111    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
112
113    /// Writes blobs and certificate
114    async fn write_blobs_and_certificate(
115        &self,
116        blobs: &[Blob],
117        certificate: &ConfirmedBlockCertificate,
118    ) -> Result<(), ViewError>;
119
120    /// Writes the given blobs, but only if they already have a blob state. Returns `true` for the
121    /// blobs that were written.
122    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
123
124    /// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
125    async fn maybe_write_blob_states(
126        &self,
127        blob_ids: &[BlobId],
128        blob_state: BlobState,
129    ) -> Result<(), ViewError>;
130
131    /// Writes several blobs.
132    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
133
134    /// Tests existence of the certificate with the given hash.
135    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
136
137    /// Reads the certificate with the given hash.
138    async fn read_certificate(
139        &self,
140        hash: CryptoHash,
141    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
142
143    /// Reads a number of certificates
144    async fn read_certificates(
145        &self,
146        hashes: &[CryptoHash],
147    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
148
149    /// Reads raw certificate bytes by hashes.
150    ///
151    /// Returns a vector where each element corresponds to the input hash.
152    /// Elements are `None` if no certificate exists for that hash.
153    /// Each found certificate is returned as `Some((lite_certificate_bytes, confirmed_block_bytes))`.
154    async fn read_certificates_raw(
155        &self,
156        hashes: &[CryptoHash],
157    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError>;
158
159    /// Reads certificates by heights for a given chain.
160    /// Returns a vector where each element corresponds to the input height.
161    /// Elements are `None` if no certificate exists at that height.
162    async fn read_certificates_by_heights(
163        &self,
164        chain_id: ChainId,
165        heights: &[BlockHeight],
166    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
167
168    /// Reads raw certificates by heights for a given chain.
169    /// Returns a vector where each element corresponds to the input height.
170    /// Elements are `None` if no certificate exists at that height.
171    /// Each found certificate is returned as a tuple of (lite_certificate_bytes, confirmed_block_bytes).
172    async fn read_certificates_by_heights_raw(
173        &self,
174        chain_id: ChainId,
175        heights: &[BlockHeight],
176    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError>;
177
178    /// Returns a vector of certificate hashes for the requested chain and heights.
179    /// The resulting vector maintains the order of the input `heights` argument.
180    /// Elements are `None` if no certificate exists at that height.
181    async fn read_certificate_hashes_by_heights(
182        &self,
183        chain_id: ChainId,
184        heights: &[BlockHeight],
185    ) -> Result<Vec<Option<CryptoHash>>, ViewError>;
186
187    /// Reads the event with the given ID.
188    async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
189
190    /// Tests existence of the event with the given ID.
191    async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
192
193    /// Lists all the events from a starting index
194    async fn read_events_from_index(
195        &self,
196        chain_id: &ChainId,
197        stream_id: &StreamId,
198        start_index: u32,
199    ) -> Result<Vec<IndexAndEvent>, ViewError>;
200
201    /// Writes a vector of events.
202    async fn write_events(
203        &self,
204        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
205    ) -> Result<(), ViewError>;
206
207    /// Reads the network description.
208    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
209
210    /// Writes the network description.
211    async fn write_network_description(
212        &self,
213        information: &NetworkDescription,
214    ) -> Result<(), ViewError>;
215
216    /// Initializes a chain in a simple way (used for testing and to create a genesis state).
217    ///
218    /// # Notes
219    ///
220    /// This method creates a new [`ChainStateView`] instance. If there are multiple instances of
221    /// the same chain active at any given moment, they will race to access persistent storage.
222    /// This can lead to invalid states and data corruption.
223    async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
224    where
225        ChainRuntimeContext<Self>: ExecutionRuntimeContext,
226    {
227        let id = description.id();
228        // Store the description blob.
229        self.write_blob(&Blob::new_chain_description(&description))
230            .await?;
231        let mut chain = self.load_chain(id).await?;
232        assert!(!chain.is_active(), "Attempting to create a chain twice");
233        let current_time = self.clock().current_time();
234        chain.initialize_if_needed(current_time).await?;
235        chain.save().await?;
236        Ok(())
237    }
238
239    /// Selects the WebAssembly runtime to use for applications (if any).
240    fn wasm_runtime(&self) -> Option<WasmRuntime>;
241
242    /// Creates a [`UserContractCode`] instance using the bytecode in storage referenced
243    /// by the `application_description`.
244    async fn load_contract(
245        &self,
246        application_description: &ApplicationDescription,
247        txn_tracker: &TransactionTracker,
248    ) -> Result<UserContractCode, ExecutionError> {
249        let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
250        let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
251            Some(content) => content.clone(),
252            None => self
253                .read_blob(contract_bytecode_blob_id)
254                .await?
255                .ok_or(ExecutionError::BlobsNotFound(vec![
256                    contract_bytecode_blob_id,
257                ]))?
258                .into_content(),
259        };
260        let compressed_contract_bytecode = CompressedBytecode {
261            compressed_bytes: content.into_arc_bytes(),
262        };
263        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
264        let contract_bytecode = self
265            .thread_pool()
266            .run_send((), move |()| async move {
267                compressed_contract_bytecode.decompress()
268            })
269            .await
270            .await??;
271        match application_description.module_id.vm_runtime {
272            VmRuntime::Wasm => {
273                cfg_if::cfg_if! {
274                    if #[cfg(with_wasm_runtime)] {
275                        let Some(wasm_runtime) = self.wasm_runtime() else {
276                            panic!("A Wasm runtime is required to load user applications.");
277                        };
278                        Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
279                           .await?
280                           .into())
281                    } else {
282                        panic!(
283                            "A Wasm runtime is required to load user applications. \
284                             Please enable the `wasmer` or the `wasmtime` feature flags \
285                             when compiling `linera-storage`."
286                        );
287                    }
288                }
289            }
290            VmRuntime::Evm => {
291                cfg_if::cfg_if! {
292                    if #[cfg(with_revm)] {
293                        let evm_runtime = EvmRuntime::Revm;
294                        Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
295                           .into())
296                    } else {
297                        panic!(
298                            "An Evm runtime is required to load user applications. \
299                             Please enable the `revm` feature flag \
300                             when compiling `linera-storage`."
301                        );
302                    }
303                }
304            }
305        }
306    }
307
308    /// Creates a [`UserServiceCode`] instance using the bytecode in storage referenced
309    /// by the `application_description`.
310    async fn load_service(
311        &self,
312        application_description: &ApplicationDescription,
313        txn_tracker: &TransactionTracker,
314    ) -> Result<UserServiceCode, ExecutionError> {
315        let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
316        let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
317            Some(content) => content.clone(),
318            None => self
319                .read_blob(service_bytecode_blob_id)
320                .await?
321                .ok_or(ExecutionError::BlobsNotFound(vec![
322                    service_bytecode_blob_id,
323                ]))?
324                .into_content(),
325        };
326        let compressed_service_bytecode = CompressedBytecode {
327            compressed_bytes: content.into_arc_bytes(),
328        };
329        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
330        let service_bytecode = self
331            .thread_pool()
332            .run_send((), move |()| async move {
333                compressed_service_bytecode.decompress()
334            })
335            .await
336            .await??;
337        match application_description.module_id.vm_runtime {
338            VmRuntime::Wasm => {
339                cfg_if::cfg_if! {
340                    if #[cfg(with_wasm_runtime)] {
341                        let Some(wasm_runtime) = self.wasm_runtime() else {
342                            panic!("A Wasm runtime is required to load user applications.");
343                        };
344                        Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
345                           .await?
346                           .into())
347                    } else {
348                        panic!(
349                            "A Wasm runtime is required to load user applications. \
350                             Please enable the `wasmer` or the `wasmtime` feature flags \
351                             when compiling `linera-storage`."
352                        );
353                    }
354                }
355            }
356            VmRuntime::Evm => {
357                cfg_if::cfg_if! {
358                    if #[cfg(with_revm)] {
359                        let evm_runtime = EvmRuntime::Revm;
360                        Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
361                           .into())
362                    } else {
363                        panic!(
364                            "An Evm runtime is required to load user applications. \
365                             Please enable the `revm` feature flag \
366                             when compiling `linera-storage`."
367                        );
368                    }
369                }
370            }
371        }
372    }
373
374    async fn block_exporter_context(
375        &self,
376        block_exporter_id: u32,
377    ) -> Result<Self::BlockExporterContext, ViewError>;
378
379    /// Lists the blob IDs in storage.
380    async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError>;
381
382    /// Lists the chain IDs in storage.
383    async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError>;
384
385    /// Lists the event IDs in storage.
386    async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError>;
387}
388
389/// The result of processing the obtained read certificates.
390pub enum ResultReadCertificates {
391    Certificates(Vec<ConfirmedBlockCertificate>),
392    InvalidHashes(Vec<CryptoHash>),
393}
394
395impl ResultReadCertificates {
396    /// Creating the processed read certificates.
397    pub fn new(
398        certificates: Vec<Option<ConfirmedBlockCertificate>>,
399        hashes: Vec<CryptoHash>,
400    ) -> Self {
401        let (certificates, invalid_hashes) = certificates
402            .into_iter()
403            .zip(hashes)
404            .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
405                Some(cert) => itertools::Either::Left(cert),
406                None => itertools::Either::Right(hash),
407            });
408        if invalid_hashes.is_empty() {
409            Self::Certificates(certificates)
410        } else {
411            Self::InvalidHashes(invalid_hashes)
412        }
413    }
414}
415
416/// The result of processing the obtained read confirmed blocks.
417pub enum ResultReadConfirmedBlocks {
418    Blocks(Vec<ConfirmedBlock>),
419    InvalidHashes(Vec<CryptoHash>),
420}
421
422impl ResultReadConfirmedBlocks {
423    /// Creating the processed read confirmed blocks.
424    pub fn new(blocks: Vec<Option<ConfirmedBlock>>, hashes: Vec<CryptoHash>) -> Self {
425        let (blocks, invalid_hashes) = blocks
426            .into_iter()
427            .zip(hashes)
428            .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(block, hash)| match block {
429                Some(block) => itertools::Either::Left(block),
430                None => itertools::Either::Right(hash),
431            });
432        if invalid_hashes.is_empty() {
433            Self::Blocks(blocks)
434        } else {
435            Self::InvalidHashes(invalid_hashes)
436        }
437    }
438}
439
440/// An implementation of `ExecutionRuntimeContext` suitable for the core protocol.
441#[derive(Clone)]
442pub struct ChainRuntimeContext<S> {
443    storage: S,
444    chain_id: ChainId,
445    thread_pool: Arc<linera_execution::ThreadPool>,
446    execution_runtime_config: ExecutionRuntimeConfig,
447    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
448    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
449}
450
451#[cfg_attr(not(web), async_trait)]
452#[cfg_attr(web, async_trait(?Send))]
453impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
454    fn chain_id(&self) -> ChainId {
455        self.chain_id
456    }
457
458    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
459        &self.thread_pool
460    }
461
462    fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
463        self.execution_runtime_config
464    }
465
466    fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
467        &self.user_contracts
468    }
469
470    fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
471        &self.user_services
472    }
473
474    async fn get_user_contract(
475        &self,
476        description: &ApplicationDescription,
477        txn_tracker: &TransactionTracker,
478    ) -> Result<UserContractCode, ExecutionError> {
479        let application_id = description.into();
480        let pinned = self.user_contracts.pin_owned();
481        if let Some(contract) = pinned.get(&application_id) {
482            return Ok(contract.clone());
483        }
484        let contract = self.storage.load_contract(description, txn_tracker).await?;
485        pinned.insert(application_id, contract.clone());
486        Ok(contract)
487    }
488
489    async fn get_user_service(
490        &self,
491        description: &ApplicationDescription,
492        txn_tracker: &TransactionTracker,
493    ) -> Result<UserServiceCode, ExecutionError> {
494        let application_id = description.into();
495        let pinned = self.user_services.pin_owned();
496        if let Some(service) = pinned.get(&application_id) {
497            return Ok(service.clone());
498        }
499        let service = self.storage.load_service(description, txn_tracker).await?;
500        pinned.insert(application_id, service.clone());
501        Ok(service)
502    }
503
504    async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
505        self.storage.read_blob(blob_id).await
506    }
507
508    async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
509        self.storage.read_event(event_id).await
510    }
511
512    async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
513        self.storage.read_network_description().await
514    }
515
516    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
517        self.storage.contains_blob(blob_id).await
518    }
519
520    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
521        self.storage.contains_event(event_id).await
522    }
523
524    #[cfg(with_testing)]
525    async fn add_blobs(
526        &self,
527        blobs: impl IntoIterator<Item = Blob> + Send,
528    ) -> Result<(), ViewError> {
529        let blobs = Vec::from_iter(blobs);
530        self.storage.write_blobs(&blobs).await
531    }
532
533    #[cfg(with_testing)]
534    async fn add_events(
535        &self,
536        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
537    ) -> Result<(), ViewError> {
538        self.storage.write_events(events).await
539    }
540}
541
542/// A clock that can be used to get the current `Timestamp`.
543#[cfg_attr(not(web), async_trait)]
544#[cfg_attr(web, async_trait(?Send))]
545pub trait Clock {
546    fn current_time(&self) -> Timestamp;
547
548    async fn sleep(&self, delta: TimeDelta);
549
550    async fn sleep_until(&self, timestamp: Timestamp);
551}
552
553#[cfg(test)]
554mod tests {
555    use std::collections::BTreeMap;
556
557    use linera_base::{
558        crypto::{AccountPublicKey, CryptoHash},
559        data_types::{
560            Amount, ApplicationPermissions, Blob, BlockHeight, ChainDescription, ChainOrigin,
561            Epoch, InitialChainConfig, NetworkDescription, Round, Timestamp,
562        },
563        identifiers::{BlobId, BlobType, ChainId, EventId, StreamId},
564        ownership::ChainOwnership,
565    };
566    use linera_chain::{
567        block::{Block, ConfirmedBlock},
568        data_types::{BlockExecutionOutcome, ProposedBlock},
569    };
570    use linera_execution::BlobState;
571    #[cfg(feature = "dynamodb")]
572    use linera_views::dynamo_db::DynamoDbDatabase;
573    #[cfg(feature = "scylladb")]
574    use linera_views::scylla_db::ScyllaDbDatabase;
575    use linera_views::{memory::MemoryDatabase, ViewError};
576    use test_case::test_case;
577
578    use super::*;
579    use crate::db_storage::DbStorage;
580
581    /// Generic test function to test Storage trait features
582    async fn test_storage_chain_exporter<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
583    where
584        S::Context: Send + Sync,
585    {
586        // Test clock functionality
587        let _current_time = storage.clock().current_time();
588        let test_chain_id = ChainId(CryptoHash::test_hash("test_chain"));
589
590        // Test loading a chain (this creates a chain state view)
591        let _chain_view = storage.load_chain(test_chain_id).await?;
592
593        // Test block exporter context
594        let _block_exporter_context = storage.block_exporter_context(0).await?;
595        Ok(())
596    }
597
598    async fn test_storage_blob<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
599    where
600        S::Context: Send + Sync,
601    {
602        // Create test blobs
603        let chain_description = ChainDescription::new(
604            ChainOrigin::Root(0),
605            InitialChainConfig {
606                ownership: ChainOwnership::single(AccountPublicKey::test_key(0).into()),
607                epoch: Epoch::ZERO,
608                min_active_epoch: Epoch::ZERO,
609                max_active_epoch: Epoch::ZERO,
610                balance: Amount::ZERO,
611                application_permissions: ApplicationPermissions::default(),
612            },
613            Timestamp::from(0),
614        );
615
616        let test_blob1 = Blob::new_chain_description(&chain_description);
617        let test_blob2 = Blob::new_data(vec![10, 20, 30]);
618        let test_blob3 = Blob::new_data(vec![40, 50, 60]);
619
620        // Testing blobs existence
621        let blob_id1 = test_blob1.id();
622        let blob_id2 = test_blob2.id();
623        let blob_id3 = test_blob3.id();
624
625        // Test blob existence before writing
626        assert!(!storage.contains_blob(blob_id1).await?);
627        assert!(!storage.contains_blob(blob_id2).await?);
628        assert!(!storage.contains_blob(blob_id3).await?);
629
630        // Test single blob write
631        storage.write_blob(&test_blob1).await?;
632        assert!(storage.contains_blob(blob_id1).await?);
633
634        // Test multiple blob write (write_blobs)
635        storage
636            .write_blobs(&[test_blob2.clone(), test_blob3.clone()])
637            .await?;
638        assert!(storage.contains_blob(blob_id2).await?);
639        assert!(storage.contains_blob(blob_id3).await?);
640
641        // Test single blob read
642        let read_blob = storage.read_blob(blob_id1).await?;
643        assert_eq!(read_blob, Some(test_blob1.clone()));
644
645        // Test multiple blob read (read_blobs)
646        let blob_ids = vec![blob_id1, blob_id2, blob_id3];
647        let read_blobs = storage.read_blobs(&blob_ids).await?;
648        assert_eq!(read_blobs.len(), 3);
649
650        // Verify each blob was read correctly
651        assert_eq!(read_blobs[0], Some(test_blob1.clone()));
652        assert_eq!(read_blobs[1], Some(test_blob2));
653        assert_eq!(read_blobs[2], Some(test_blob3));
654
655        // Test missing blobs detection
656        let missing_blob_id = BlobId::new(CryptoHash::test_hash("missing"), BlobType::Data);
657        let missing_blobs = storage.missing_blobs(&[blob_id1, missing_blob_id]).await?;
658        assert_eq!(missing_blobs, vec![missing_blob_id]);
659
660        // Test maybe_write_blobs (should return false as blobs don't have blob states yet)
661        let write_results = storage
662            .maybe_write_blobs(std::slice::from_ref(&test_blob1))
663            .await?;
664        assert_eq!(write_results, vec![false]);
665
666        // Test blob state operations
667        let blob_state1 = BlobState {
668            last_used_by: None,
669            chain_id: ChainId(CryptoHash::test_hash("chain1")),
670            block_height: BlockHeight(0),
671            epoch: Some(Epoch::ZERO),
672        };
673        let blob_state2 = BlobState {
674            last_used_by: Some(CryptoHash::test_hash("cert")),
675            chain_id: ChainId(CryptoHash::test_hash("chain2")),
676            block_height: BlockHeight(1),
677            epoch: Some(Epoch::from(1)),
678        };
679
680        // Test blob state existence before writing
681        assert!(!storage.contains_blob_state(blob_id1).await?);
682        assert!(!storage.contains_blob_state(blob_id2).await?);
683
684        // Test blob state writing
685        storage
686            .maybe_write_blob_states(&[blob_id1], blob_state1.clone())
687            .await?;
688        storage
689            .maybe_write_blob_states(&[blob_id2], blob_state2.clone())
690            .await?;
691
692        // Test blob state existence after writing
693        assert!(storage.contains_blob_state(blob_id1).await?);
694        assert!(storage.contains_blob_state(blob_id2).await?);
695
696        // Test single blob state read
697        let read_blob_state = storage.read_blob_state(blob_id1).await?;
698        assert_eq!(read_blob_state, Some(blob_state1.clone()));
699
700        // Test multiple blob state read (read_blob_states)
701        let read_blob_states = storage.read_blob_states(&[blob_id1, blob_id2]).await?;
702        assert_eq!(read_blob_states.len(), 2);
703
704        // Verify blob states
705        assert_eq!(read_blob_states[0], Some(blob_state1));
706        assert_eq!(read_blob_states[1], Some(blob_state2));
707
708        // Test maybe_write_blobs now that blob states exist (should return true)
709        let write_results = storage
710            .maybe_write_blobs(std::slice::from_ref(&test_blob1))
711            .await?;
712        assert_eq!(write_results, vec![true]);
713
714        Ok(())
715    }
716
717    async fn test_storage_certificate<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
718    where
719        S::Context: Send + Sync,
720    {
721        let cert_hash = CryptoHash::test_hash("certificate");
722
723        // Test certificate existence (should be false initially)
724        assert!(!storage.contains_certificate(cert_hash).await?);
725
726        // Test reading non-existent certificate
727        assert!(storage.read_certificate(cert_hash).await?.is_none());
728
729        // Test reading multiple certificates
730        let cert_hashes = vec![cert_hash, CryptoHash::test_hash("cert2")];
731        let certs_result = storage.read_certificates(&cert_hashes).await?;
732        assert_eq!(certs_result.len(), 2);
733        assert!(certs_result[0].is_none());
734        assert!(certs_result[1].is_none());
735
736        // Test raw certificate reading
737        let raw_certs_result = storage.read_certificates_raw(&cert_hashes).await?;
738        assert!(raw_certs_result.iter().all(|cert| cert.is_none())); // No certificates exist
739
740        // Test confirmed block reading
741        let block_hash = CryptoHash::test_hash("block");
742        let block_result = storage.read_confirmed_block(block_hash).await?;
743        assert!(block_result.is_none());
744
745        // Test write_blobs_and_certificate functionality
746        // Create test blobs
747        let test_blob1 = Blob::new_data(vec![1, 2, 3]);
748        let test_blob2 = Blob::new_data(vec![4, 5, 6]);
749        let blobs = vec![test_blob1, test_blob2];
750
751        // Create a test certificate using the working pattern from linera-indexer tests
752        let chain_id = ChainId(CryptoHash::test_hash("test_chain_cert"));
753
754        // Create a minimal proposed block (genesis block)
755        let proposed_block = ProposedBlock {
756            epoch: Epoch::ZERO,
757            chain_id,
758            transactions: vec![],
759            previous_block_hash: None,
760            height: BlockHeight::ZERO,
761            authenticated_owner: None,
762            timestamp: Timestamp::default(),
763        };
764
765        // Create a minimal block execution outcome with proper BTreeMap types
766        let outcome = BlockExecutionOutcome {
767            messages: vec![],
768            state_hash: CryptoHash::default(),
769            oracle_responses: vec![],
770            events: vec![],
771            blobs: vec![],
772            operation_results: vec![],
773            previous_event_blocks: BTreeMap::new(),
774            previous_message_blocks: BTreeMap::new(),
775        };
776
777        let block = Block::new(proposed_block, outcome);
778        let confirmed_block = ConfirmedBlock::new(block);
779        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
780
781        // Test writing blobs and certificate together
782        storage
783            .write_blobs_and_certificate(&blobs, &certificate)
784            .await?;
785
786        // Verify the certificate was written
787        let cert_hash = certificate.hash();
788        assert!(storage.contains_certificate(cert_hash).await?);
789
790        // Verify the certificate can be read back
791        let read_certificate = storage.read_certificate(cert_hash).await?;
792        assert!(read_certificate.is_some());
793        assert_eq!(read_certificate.unwrap().hash(), cert_hash);
794
795        // Verify the blobs were written
796        for blob in &blobs {
797            assert!(storage.contains_blob(blob.id()).await?);
798        }
799
800        Ok(())
801    }
802
803    async fn test_storage_event<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
804    where
805        S::Context: Send + Sync,
806    {
807        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
808        let stream_id = StreamId::system("test_stream");
809
810        // Test multiple events
811        let event_id1 = EventId {
812            chain_id,
813            stream_id: stream_id.clone(),
814            index: 0,
815        };
816        let event_id2 = EventId {
817            chain_id,
818            stream_id: stream_id.clone(),
819            index: 1,
820        };
821        let event_id3 = EventId {
822            chain_id,
823            stream_id: stream_id.clone(),
824            index: 2,
825        };
826
827        let event_data1 = vec![1, 2, 3];
828        let event_data2 = vec![4, 5, 6];
829        let event_data3 = vec![7, 8, 9];
830
831        // Test event existence before writing
832        assert!(!storage.contains_event(event_id1.clone()).await?);
833        assert!(!storage.contains_event(event_id2.clone()).await?);
834
835        // Write multiple events
836        storage
837            .write_events([
838                (event_id1.clone(), event_data1.clone()),
839                (event_id2.clone(), event_data2.clone()),
840                (event_id3.clone(), event_data3.clone()),
841            ])
842            .await?;
843
844        // Test event existence after writing
845        assert!(storage.contains_event(event_id1.clone()).await?);
846        assert!(storage.contains_event(event_id2.clone()).await?);
847        assert!(storage.contains_event(event_id3.clone()).await?);
848
849        // Test individual event reading
850        let read_event1 = storage.read_event(event_id1).await?;
851        assert_eq!(read_event1, Some(event_data1));
852
853        let read_event2 = storage.read_event(event_id2).await?;
854        assert_eq!(read_event2, Some(event_data2));
855
856        // Test reading events from index
857        let events_from_index = storage
858            .read_events_from_index(&chain_id, &stream_id, 1)
859            .await?;
860        assert!(events_from_index.len() >= 2); // Should contain events at index 1 and 2
861        Ok(())
862    }
863
864    async fn test_storage_network_description<S: Storage + Sync>(
865        storage: &S,
866    ) -> Result<(), ViewError>
867    where
868        S::Context: Send + Sync,
869    {
870        let admin_chain_id = ChainId(CryptoHash::test_hash("test_chain_second"));
871
872        let network_desc = NetworkDescription {
873            name: "test_network".to_string(),
874            genesis_config_hash: CryptoHash::test_hash("genesis_config"),
875            genesis_timestamp: Timestamp::from(0),
876            genesis_committee_blob_hash: CryptoHash::test_hash("committee"),
877            admin_chain_id,
878        };
879
880        // Test reading non-existent network description
881        assert!(storage.read_network_description().await?.is_none());
882
883        // Write network description
884        storage.write_network_description(&network_desc).await?;
885
886        // Test reading existing network description
887        let read_desc = storage.read_network_description().await?;
888        assert_eq!(read_desc, Some(network_desc));
889
890        Ok(())
891    }
892
893    /// Generic test function to test Storage trait features
894    #[test_case(DbStorage::<MemoryDatabase, _>::make_test_storage(None).await; "memory")]
895    #[cfg_attr(feature = "dynamodb", test_case(DbStorage::<DynamoDbDatabase, _>::make_test_storage(None).await; "dynamo_db"))]
896    #[cfg_attr(feature = "scylladb", test_case(DbStorage::<ScyllaDbDatabase, _>::make_test_storage(None).await; "scylla_db"))]
897    #[test_log::test(tokio::test)]
898    async fn test_storage_features<S: Storage + Sync>(storage: S) -> Result<(), ViewError>
899    where
900        S::Context: Send + Sync,
901    {
902        test_storage_chain_exporter(&storage).await?;
903        test_storage_blob(&storage).await?;
904        test_storage_certificate(&storage).await?;
905        test_storage_event(&storage).await?;
906        test_storage_network_description(&storage).await?;
907        Ok(())
908    }
909}