linera_storage/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the storage abstractions for individual chains and certificates.
5
6mod db_storage;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use itertools::Itertools;
12use linera_base::{
13    crypto::CryptoHash,
14    data_types::{
15        ApplicationDescription, Blob, BlockHeight, ChainDescription, CompressedBytecode,
16        NetworkDescription, Timestamp,
17    },
18    identifiers::{ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent, StreamId},
19    vm::VmRuntime,
20};
21pub use linera_cache::DEFAULT_CLEANUP_INTERVAL_SECS;
22use linera_chain::{
23    types::{ConfirmedBlock, ConfirmedBlockCertificate},
24    ChainError, ChainStateView,
25};
26use linera_execution::{
27    committee::Committee, BlobState, ExecutionError, ExecutionRuntimeConfig,
28    ExecutionRuntimeContext, SharedCommittees, TransactionTracker, UserContractCode,
29    UserServiceCode, WasmRuntime,
30};
31#[cfg(with_revm)]
32use linera_execution::{
33    evm::revm::{EvmContractModule, EvmServiceModule},
34    EvmRuntime,
35};
36#[cfg(with_wasm_runtime)]
37use linera_execution::{WasmContractModule, WasmServiceModule};
38use linera_views::{context::Context, views::RootView, ViewError};
39
40#[cfg(with_metrics)]
41pub use crate::db_storage::metrics;
42pub use crate::db_storage::{
43    ChainStatesFirstAssignment, DbStorage, StorageCacheConfig, StorageCaches, WallClock,
44};
45#[cfg(with_testing)]
46pub use crate::db_storage::{TestClock, DEFAULT_STORAGE_CACHE_CONFIG};
47
48/// The default namespace to be used when none is specified
49pub const DEFAULT_NAMESPACE: &str = "default";
50
51/// Communicate with a persistent storage using the "views" abstraction.
52#[cfg_attr(not(web), async_trait)]
53#[cfg_attr(web, async_trait(?Send))]
54pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
55    /// The low-level storage implementation in use by the core protocol (chain workers etc).
56    type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
57
58    /// The clock type being used.
59    type Clock: Clock + Clone + Send + Sync;
60
61    /// The low-level storage implementation in use by the block exporter.
62    type BlockExporterContext: Context<Extra = u32> + Clone;
63
64    /// Returns the current wall clock time.
65    fn clock(&self) -> &Self::Clock;
66
67    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool>;
68
69    /// Loads the view of a chain state.
70    ///
71    /// # Notes
72    ///
73    /// Each time this method is called, a new [`ChainStateView`] is created. If there are multiple
74    /// instances of the same chain active at any given moment, they will race to access persistent
75    /// storage. This can lead to invalid states and data corruption.
76    async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
77
78    /// Tests the existence of a blob with the given blob ID.
79    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
80
81    /// Returns what blobs from the input are missing from storage.
82    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
83
84    /// Tests existence of a blob state with the given blob ID.
85    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
86
87    /// Reads the hashed certificate value with the given hash.
88    async fn read_confirmed_block(
89        &self,
90        hash: CryptoHash,
91    ) -> Result<Option<Arc<ConfirmedBlock>>, ViewError>;
92
93    /// Reads a number of confirmed blocks by their hashes.
94    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
95        &self,
96        hashes: I,
97    ) -> Result<Vec<Option<Arc<ConfirmedBlock>>>, ViewError>;
98
99    /// Reads the blob with the given blob ID.
100    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError>;
101
102    /// Reads the blobs with the given blob IDs.
103    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Arc<Blob>>>, ViewError>;
104
105    /// Reads the blob state with the given blob ID.
106    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
107
108    /// Reads the blob states with the given blob IDs.
109    async fn read_blob_states(
110        &self,
111        blob_ids: &[BlobId],
112    ) -> Result<Vec<Option<BlobState>>, ViewError>;
113
114    /// Writes the given blob.
115    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
116
117    /// Writes blobs and certificate
118    async fn write_blobs_and_certificate(
119        &self,
120        blobs: &[Blob],
121        certificate: &ConfirmedBlockCertificate,
122    ) -> Result<(), ViewError>;
123
124    /// Writes the given blobs, but only if they already have a blob state. Returns `true` for the
125    /// blobs that were written.
126    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
127
128    /// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
129    async fn maybe_write_blob_states(
130        &self,
131        blob_ids: &[BlobId],
132        blob_state: BlobState,
133    ) -> Result<(), ViewError>;
134
135    /// Writes several blobs.
136    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
137
138    /// Tests existence of the certificate with the given hash.
139    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
140
141    /// Inserts a certificate into the in-memory dedup cache and returns the
142    /// canonical [`Arc`]. If the cache already holds an `Arc` for this hash,
143    /// the passed-in `certificate` is dropped and the existing `Arc` is
144    /// returned. This must be used (rather than `Arc::new`) for any
145    /// freshly-constructed [`ConfirmedBlockCertificate`] that should
146    /// participate in the "one allocation per content" invariant.
147    fn cache_certificate(
148        &self,
149        certificate: ConfirmedBlockCertificate,
150    ) -> Arc<ConfirmedBlockCertificate>;
151
152    /// Inserts a blob into the in-memory dedup cache and returns the canonical
153    /// [`Arc`]. If the cache already holds an `Arc` for this blob ID, the
154    /// passed-in `blob` is dropped and the existing `Arc` is returned. This
155    /// must be used (rather than `Arc::new`) for any freshly-constructed
156    /// [`Blob`] that should participate in the "one allocation per content"
157    /// invariant.
158    fn cache_blob(&self, blob: Blob) -> Arc<Blob>;
159
160    /// Inserts a confirmed block into the in-memory dedup cache and returns
161    /// the canonical [`Arc`]. If the cache already holds an `Arc` for this
162    /// hash, the passed-in `block` is dropped and the existing `Arc` is
163    /// returned. This must be used (rather than `Arc::new`) for any
164    /// freshly-constructed [`ConfirmedBlock`] that should participate in the
165    /// "one allocation per content" invariant.
166    fn cache_confirmed_block(&self, block: ConfirmedBlock) -> Arc<ConfirmedBlock>;
167
168    /// Reads the certificate with the given hash.
169    async fn read_certificate(
170        &self,
171        hash: CryptoHash,
172    ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError>;
173
174    /// Reads a number of certificates
175    async fn read_certificates(
176        &self,
177        hashes: &[CryptoHash],
178    ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError>;
179
180    /// Reads raw certificate bytes by hashes.
181    ///
182    /// Returns a vector where each element corresponds to the input hash.
183    /// Elements are `None` if no certificate exists for that hash.
184    /// Each found certificate is returned as `Some((lite_certificate_bytes, confirmed_block_bytes))`.
185    async fn read_certificates_raw(
186        &self,
187        hashes: &[CryptoHash],
188    ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError>;
189
190    /// Reads certificates by heights for a given chain.
191    /// Returns a vector where each element corresponds to the input height.
192    /// Elements are `None` if no certificate exists at that height.
193    async fn read_certificates_by_heights(
194        &self,
195        chain_id: ChainId,
196        heights: &[BlockHeight],
197    ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError>;
198
199    /// Reads raw certificates by heights for a given chain.
200    /// Returns a vector where each element corresponds to the input height.
201    /// Elements are `None` if no certificate exists at that height.
202    /// Each found certificate is returned as a tuple of (lite_certificate_bytes, confirmed_block_bytes).
203    async fn read_certificates_by_heights_raw(
204        &self,
205        chain_id: ChainId,
206        heights: &[BlockHeight],
207    ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError>;
208
209    /// Returns a vector of certificate hashes for the requested chain and heights.
210    /// The resulting vector maintains the order of the input `heights` argument.
211    /// Elements are `None` if no certificate exists at that height.
212    async fn read_certificate_hashes_by_heights(
213        &self,
214        chain_id: ChainId,
215        heights: &[BlockHeight],
216    ) -> Result<Vec<Option<CryptoHash>>, ViewError>;
217
218    /// Looks up the block heights where the given events were published.
219    /// Returns `None` for events that are not in the index.
220    async fn read_event_block_heights(
221        &self,
222        event_ids: &[EventId],
223    ) -> Result<Vec<Option<BlockHeight>>, ViewError>;
224
225    /// Reads the event with the given ID.
226    async fn read_event(&self, id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError>;
227
228    /// Tests existence of the event with the given ID.
229    async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
230
231    /// Lists all the events from a starting index
232    async fn read_events_from_index(
233        &self,
234        chain_id: &ChainId,
235        stream_id: &StreamId,
236        start_index: u32,
237    ) -> Result<Vec<IndexAndEvent>, ViewError>;
238
239    /// Writes a vector of events.
240    async fn write_events(
241        &self,
242        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
243    ) -> Result<(), ViewError>;
244
245    /// Reads the network description.
246    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
247
248    /// Writes the network description.
249    async fn write_network_description(
250        &self,
251        information: &NetworkDescription,
252    ) -> Result<(), ViewError>;
253
254    /// Initializes a chain in a simple way (used for testing and to create a genesis state).
255    ///
256    /// # Notes
257    ///
258    /// This method creates a new [`ChainStateView`] instance. If there are multiple instances of
259    /// the same chain active at any given moment, they will race to access persistent storage.
260    /// This can lead to invalid states and data corruption.
261    async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
262    where
263        ChainRuntimeContext<Self>: ExecutionRuntimeContext,
264    {
265        let id = description.id();
266        // Store the description blob.
267        self.write_blob(&Blob::new_chain_description(&description))
268            .await?;
269        let mut chain = self.load_chain(id).await?;
270        assert!(
271            !chain.is_active().await?,
272            "Attempting to create a chain twice"
273        );
274        let current_time = self.clock().current_time();
275        chain.initialize_if_needed(current_time).await?;
276        chain.save().await?;
277        Ok(())
278    }
279
280    /// Selects the WebAssembly runtime to use for applications (if any).
281    fn wasm_runtime(&self) -> Option<WasmRuntime>;
282
283    /// Creates a [`UserContractCode`] instance using the bytecode in storage referenced
284    /// by the `application_description`.
285    async fn load_contract(
286        &self,
287        application_description: &ApplicationDescription,
288        txn_tracker: &TransactionTracker,
289    ) -> Result<UserContractCode, ExecutionError> {
290        let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
291        let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
292            Some(content) => content.clone(),
293            None => self
294                .read_blob(contract_bytecode_blob_id)
295                .await?
296                .ok_or(ExecutionError::BlobsNotFound(vec![
297                    contract_bytecode_blob_id,
298                ]))?
299                .content()
300                .clone(),
301        };
302        let compressed_contract_bytecode = CompressedBytecode {
303            compressed_bytes: content.into_arc_bytes(),
304        };
305        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
306        let contract_bytecode = self
307            .thread_pool()
308            .run_send((), move |()| async move {
309                compressed_contract_bytecode.decompress()
310            })
311            .await
312            .await??;
313        match application_description.module_id.vm_runtime {
314            VmRuntime::Wasm => {
315                cfg_if::cfg_if! {
316                    if #[cfg(with_wasm_runtime)] {
317                        let Some(wasm_runtime) = self.wasm_runtime() else {
318                            panic!("A Wasm runtime is required to load user applications.");
319                        };
320                        Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
321                           .await?
322                           .into())
323                    } else {
324                        panic!(
325                            "A Wasm runtime is required to load user applications. \
326                             Please enable the `wasmer` or the `wasmtime` feature flags \
327                             when compiling `linera-storage`."
328                        );
329                    }
330                }
331            }
332            VmRuntime::Evm => {
333                cfg_if::cfg_if! {
334                    if #[cfg(with_revm)] {
335                        let evm_runtime = EvmRuntime::Revm;
336                        Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
337                           .into())
338                    } else {
339                        panic!(
340                            "An Evm runtime is required to load user applications. \
341                             Please enable the `revm` feature flag \
342                             when compiling `linera-storage`."
343                        );
344                    }
345                }
346            }
347        }
348    }
349
350    /// Creates a [`UserServiceCode`] instance using the bytecode in storage referenced
351    /// by the `application_description`.
352    async fn load_service(
353        &self,
354        application_description: &ApplicationDescription,
355        txn_tracker: &TransactionTracker,
356    ) -> Result<UserServiceCode, ExecutionError> {
357        let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
358        let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
359            Some(content) => content.clone(),
360            None => self
361                .read_blob(service_bytecode_blob_id)
362                .await?
363                .ok_or(ExecutionError::BlobsNotFound(vec![
364                    service_bytecode_blob_id,
365                ]))?
366                .content()
367                .clone(),
368        };
369        let compressed_service_bytecode = CompressedBytecode {
370            compressed_bytes: content.into_arc_bytes(),
371        };
372        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
373        let service_bytecode = self
374            .thread_pool()
375            .run_send((), move |()| async move {
376                compressed_service_bytecode.decompress()
377            })
378            .await
379            .await??;
380        match application_description.module_id.vm_runtime {
381            VmRuntime::Wasm => {
382                cfg_if::cfg_if! {
383                    if #[cfg(with_wasm_runtime)] {
384                        let Some(wasm_runtime) = self.wasm_runtime() else {
385                            panic!("A Wasm runtime is required to load user applications.");
386                        };
387                        Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
388                           .await?
389                           .into())
390                    } else {
391                        panic!(
392                            "A Wasm runtime is required to load user applications. \
393                             Please enable the `wasmer` or the `wasmtime` feature flags \
394                             when compiling `linera-storage`."
395                        );
396                    }
397                }
398            }
399            VmRuntime::Evm => {
400                cfg_if::cfg_if! {
401                    if #[cfg(with_revm)] {
402                        let evm_runtime = EvmRuntime::Revm;
403                        Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
404                           .into())
405                    } else {
406                        panic!(
407                            "An Evm runtime is required to load user applications. \
408                             Please enable the `revm` feature flag \
409                             when compiling `linera-storage`."
410                        );
411                    }
412                }
413            }
414        }
415    }
416
417    async fn block_exporter_context(
418        &self,
419        block_exporter_id: u32,
420    ) -> Result<Self::BlockExporterContext, ViewError>;
421
422    /// Returns the process-wide committee cache shared by all chains.
423    fn shared_committees(&self) -> &SharedCommittees;
424
425    /// Returns the committee whose serialized form hashes to `hash`, loading it
426    /// from the blob store on cache miss.
427    async fn get_or_load_committee_by_hash(
428        &self,
429        hash: CryptoHash,
430    ) -> Result<Arc<Committee>, ExecutionError> {
431        if let Some(committee) = self.shared_committees().get(hash) {
432            return Ok(committee);
433        }
434        let blob_id = BlobId::new(hash, BlobType::Committee);
435        let blob = self
436            .read_blob(blob_id)
437            .await?
438            .ok_or(ExecutionError::BlobsNotFound(vec![blob_id]))?;
439        let committee = bcs::from_bytes(blob.bytes())?;
440        Ok(self.shared_committees().insert(hash, Arc::new(committee)))
441    }
442
443    /// Lists the blob IDs in storage.
444    async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError>;
445
446    /// Lists the chain IDs in storage.
447    async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError>;
448
449    /// Lists the event IDs in storage.
450    async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError>;
451}
452
453/// The result of processing the obtained read certificates.
454pub enum ResultReadCertificates {
455    Certificates(Vec<ConfirmedBlockCertificate>),
456    InvalidHashes(Vec<CryptoHash>),
457}
458
459impl ResultReadCertificates {
460    /// Creating the processed read certificates.
461    pub fn new(
462        certificates: Vec<Option<Arc<ConfirmedBlockCertificate>>>,
463        hashes: Vec<CryptoHash>,
464    ) -> Self {
465        let (certificates, invalid_hashes) = certificates
466            .into_iter()
467            .zip(hashes)
468            .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
469                Some(cert) => itertools::Either::Left(Arc::unwrap_or_clone(cert)),
470                None => itertools::Either::Right(hash),
471            });
472        if invalid_hashes.is_empty() {
473            Self::Certificates(certificates)
474        } else {
475            Self::InvalidHashes(invalid_hashes)
476        }
477    }
478}
479
480/// An implementation of `ExecutionRuntimeContext` suitable for the core protocol.
481#[derive(Clone)]
482pub struct ChainRuntimeContext<S> {
483    storage: S,
484    chain_id: ChainId,
485    thread_pool: Arc<linera_execution::ThreadPool>,
486    execution_runtime_config: ExecutionRuntimeConfig,
487    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
488    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
489}
490
491#[cfg_attr(not(web), async_trait)]
492#[cfg_attr(web, async_trait(?Send))]
493impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
494    fn chain_id(&self) -> ChainId {
495        self.chain_id
496    }
497
498    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
499        &self.thread_pool
500    }
501
502    fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
503        self.execution_runtime_config
504    }
505
506    fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
507        &self.user_contracts
508    }
509
510    fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
511        &self.user_services
512    }
513
514    async fn get_user_contract(
515        &self,
516        description: &ApplicationDescription,
517        txn_tracker: &TransactionTracker,
518    ) -> Result<UserContractCode, ExecutionError> {
519        let application_id = description.into();
520        let pinned = self.user_contracts.pin_owned();
521        if let Some(contract) = pinned.get(&application_id) {
522            return Ok(contract.clone());
523        }
524        let contract = self.storage.load_contract(description, txn_tracker).await?;
525        pinned.insert(application_id, contract.clone());
526        Ok(contract)
527    }
528
529    async fn get_user_service(
530        &self,
531        description: &ApplicationDescription,
532        txn_tracker: &TransactionTracker,
533    ) -> Result<UserServiceCode, ExecutionError> {
534        let application_id = description.into();
535        let pinned = self.user_services.pin_owned();
536        if let Some(service) = pinned.get(&application_id) {
537            return Ok(service.clone());
538        }
539        let service = self.storage.load_service(description, txn_tracker).await?;
540        pinned.insert(application_id, service.clone());
541        Ok(service)
542    }
543
544    async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError> {
545        self.storage.read_blob(blob_id).await
546    }
547
548    async fn get_event(&self, event_id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError> {
549        self.storage.read_event(event_id).await
550    }
551
552    async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
553        self.storage.read_network_description().await
554    }
555
556    async fn get_or_load_committee_by_hash(
557        &self,
558        hash: CryptoHash,
559    ) -> Result<Arc<Committee>, ExecutionError> {
560        self.storage.get_or_load_committee_by_hash(hash).await
561    }
562
563    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
564        self.storage.contains_blob(blob_id).await
565    }
566
567    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
568        self.storage.contains_event(event_id).await
569    }
570
571    #[cfg(with_testing)]
572    async fn add_blobs(
573        &self,
574        blobs: impl IntoIterator<Item = Blob> + Send,
575    ) -> Result<(), ViewError> {
576        let blobs = Vec::from_iter(blobs);
577        self.storage.write_blobs(&blobs).await
578    }
579
580    #[cfg(with_testing)]
581    async fn add_events(
582        &self,
583        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
584    ) -> Result<(), ViewError> {
585        self.storage.write_events(events).await
586    }
587}
588
589/// A clock that can be used to get the current `Timestamp`.
590#[cfg_attr(not(web), async_trait)]
591#[cfg_attr(web, async_trait(?Send))]
592pub trait Clock {
593    fn current_time(&self) -> Timestamp;
594
595    async fn sleep_until(&self, timestamp: Timestamp);
596}
597
598#[cfg(test)]
599mod tests {
600    use std::collections::BTreeMap;
601
602    use linera_base::{
603        crypto::{AccountPublicKey, CryptoHash},
604        data_types::{
605            Amount, ApplicationPermissions, Blob, BlockHeight, ChainDescription, ChainOrigin,
606            Epoch, InitialChainConfig, NetworkDescription, Round, Timestamp,
607        },
608        identifiers::{BlobId, BlobType, ChainId, EventId, StreamId},
609        ownership::ChainOwnership,
610    };
611    use linera_chain::{
612        block::{Block, ConfirmedBlock},
613        data_types::{BlockExecutionOutcome, ProposedBlock},
614    };
615    use linera_execution::BlobState;
616    #[cfg(feature = "dynamodb")]
617    use linera_views::dynamo_db::DynamoDbDatabase;
618    #[cfg(feature = "scylladb")]
619    use linera_views::scylla_db::ScyllaDbDatabase;
620    use linera_views::{memory::MemoryDatabase, ViewError};
621    use test_case::test_case;
622
623    use super::*;
624    use crate::db_storage::DbStorage;
625
626    /// Generic test function to test Storage trait features
627    async fn test_storage_chain_exporter<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
628    where
629        S::Context: Send + Sync,
630    {
631        // Test clock functionality
632        let _current_time = storage.clock().current_time();
633        let test_chain_id = ChainId(CryptoHash::test_hash("test_chain"));
634
635        // Test loading a chain (this creates a chain state view)
636        let _chain_view = storage.load_chain(test_chain_id).await?;
637
638        // Test block exporter context
639        let _block_exporter_context = storage.block_exporter_context(0).await?;
640        Ok(())
641    }
642
643    async fn test_storage_blob<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
644    where
645        S::Context: Send + Sync,
646    {
647        // Create test blobs
648        let chain_description = ChainDescription::new(
649            ChainOrigin::Root(0),
650            InitialChainConfig {
651                ownership: ChainOwnership::single(AccountPublicKey::test_key(0).into()),
652                epoch: Epoch::ZERO,
653                min_active_epoch: Epoch::ZERO,
654                max_active_epoch: Epoch::ZERO,
655                balance: Amount::ZERO,
656                application_permissions: ApplicationPermissions::default(),
657            },
658            Timestamp::from(0),
659        );
660
661        let test_blob1 = Blob::new_chain_description(&chain_description);
662        let test_blob2 = Blob::new_data(vec![10, 20, 30]);
663        let test_blob3 = Blob::new_data(vec![40, 50, 60]);
664
665        // Testing blobs existence
666        let blob_id1 = test_blob1.id();
667        let blob_id2 = test_blob2.id();
668        let blob_id3 = test_blob3.id();
669
670        // Test blob existence before writing
671        assert!(!storage.contains_blob(blob_id1).await?);
672        assert!(!storage.contains_blob(blob_id2).await?);
673        assert!(!storage.contains_blob(blob_id3).await?);
674
675        // Test single blob write
676        storage.write_blob(&test_blob1).await?;
677        assert!(storage.contains_blob(blob_id1).await?);
678
679        // Test multiple blob write (write_blobs)
680        storage
681            .write_blobs(&[test_blob2.clone(), test_blob3.clone()])
682            .await?;
683        assert!(storage.contains_blob(blob_id2).await?);
684        assert!(storage.contains_blob(blob_id3).await?);
685
686        // Test single blob read
687        let read_blob = storage.read_blob(blob_id1).await?;
688        assert_eq!(read_blob.as_deref(), Some(&test_blob1));
689
690        // Test multiple blob read (read_blobs)
691        let blob_ids = vec![blob_id1, blob_id2, blob_id3];
692        let read_blobs = storage.read_blobs(&blob_ids).await?;
693        assert_eq!(read_blobs.len(), 3);
694
695        // Verify each blob was read correctly
696        assert_eq!(read_blobs[0].as_deref(), Some(&test_blob1));
697        assert_eq!(read_blobs[1].as_deref(), Some(&test_blob2));
698        assert_eq!(read_blobs[2].as_deref(), Some(&test_blob3));
699
700        // Test missing blobs detection
701        let missing_blob_id = BlobId::new(CryptoHash::test_hash("missing"), BlobType::Data);
702        let missing_blobs = storage.missing_blobs(&[blob_id1, missing_blob_id]).await?;
703        assert_eq!(missing_blobs, vec![missing_blob_id]);
704
705        // Test maybe_write_blobs (should return false as blobs don't have blob states yet)
706        let write_results = storage
707            .maybe_write_blobs(std::slice::from_ref(&test_blob1))
708            .await?;
709        assert_eq!(write_results, vec![false]);
710
711        // Test blob state operations
712        let blob_state1 = BlobState {
713            last_used_by: None,
714            chain_id: ChainId(CryptoHash::test_hash("chain1")),
715            block_height: BlockHeight(0),
716            epoch: Some(Epoch::ZERO),
717        };
718        let blob_state2 = BlobState {
719            last_used_by: Some(CryptoHash::test_hash("cert")),
720            chain_id: ChainId(CryptoHash::test_hash("chain2")),
721            block_height: BlockHeight(1),
722            epoch: Some(Epoch::from(1)),
723        };
724
725        // Test blob state existence before writing
726        assert!(!storage.contains_blob_state(blob_id1).await?);
727        assert!(!storage.contains_blob_state(blob_id2).await?);
728
729        // Test blob state writing
730        storage
731            .maybe_write_blob_states(&[blob_id1], blob_state1.clone())
732            .await?;
733        storage
734            .maybe_write_blob_states(&[blob_id2], blob_state2.clone())
735            .await?;
736
737        // Test blob state existence after writing
738        assert!(storage.contains_blob_state(blob_id1).await?);
739        assert!(storage.contains_blob_state(blob_id2).await?);
740
741        // Test single blob state read
742        let read_blob_state = storage.read_blob_state(blob_id1).await?;
743        assert_eq!(read_blob_state, Some(blob_state1.clone()));
744
745        // Test multiple blob state read (read_blob_states)
746        let read_blob_states = storage.read_blob_states(&[blob_id1, blob_id2]).await?;
747        assert_eq!(read_blob_states.len(), 2);
748
749        // Verify blob states
750        assert_eq!(read_blob_states[0], Some(blob_state1));
751        assert_eq!(read_blob_states[1], Some(blob_state2));
752
753        // Test maybe_write_blobs now that blob states exist (should return true)
754        let write_results = storage
755            .maybe_write_blobs(std::slice::from_ref(&test_blob1))
756            .await?;
757        assert_eq!(write_results, vec![true]);
758
759        Ok(())
760    }
761
762    async fn test_storage_certificate<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
763    where
764        S::Context: Send + Sync,
765    {
766        let cert_hash = CryptoHash::test_hash("certificate");
767
768        // Test certificate existence (should be false initially)
769        assert!(!storage.contains_certificate(cert_hash).await?);
770
771        // Test reading non-existent certificate
772        assert!(storage.read_certificate(cert_hash).await?.is_none());
773
774        // Test reading multiple certificates
775        let cert_hashes = vec![cert_hash, CryptoHash::test_hash("cert2")];
776        let certs_result = storage.read_certificates(&cert_hashes).await?;
777        assert_eq!(certs_result.len(), 2);
778        assert!(certs_result[0].is_none());
779        assert!(certs_result[1].is_none());
780
781        // Test raw certificate reading
782        let raw_certs_result = storage.read_certificates_raw(&cert_hashes).await?;
783        assert!(raw_certs_result.iter().all(|cert| cert.is_none())); // No certificates exist
784
785        // Test confirmed block reading
786        let block_hash = CryptoHash::test_hash("block");
787        let block_result = storage.read_confirmed_block(block_hash).await?;
788        assert!(block_result.is_none());
789
790        // Test write_blobs_and_certificate functionality
791        // Create test blobs
792        let test_blob1 = Blob::new_data(vec![1, 2, 3]);
793        let test_blob2 = Blob::new_data(vec![4, 5, 6]);
794        let blobs = vec![test_blob1, test_blob2];
795
796        // Create a test certificate using the working pattern from linera-indexer tests
797        let chain_id = ChainId(CryptoHash::test_hash("test_chain_cert"));
798
799        // Create a minimal proposed block (genesis block)
800        let proposed_block = ProposedBlock {
801            epoch: Epoch::ZERO,
802            chain_id,
803            transactions: vec![],
804            previous_block_hash: None,
805            height: BlockHeight::ZERO,
806            authenticated_owner: None,
807            timestamp: Timestamp::default(),
808        };
809
810        // Create a minimal block execution outcome with proper BTreeMap types
811        let outcome = BlockExecutionOutcome {
812            messages: vec![],
813            state_hash: CryptoHash::default(),
814            oracle_responses: vec![],
815            events: vec![],
816            blobs: vec![],
817            operation_results: vec![],
818            previous_event_blocks: BTreeMap::new(),
819            previous_message_blocks: BTreeMap::new(),
820        };
821
822        let block = Block::new(proposed_block, outcome);
823        let confirmed_block = ConfirmedBlock::new(block);
824        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
825
826        // Test writing blobs and certificate together
827        storage
828            .write_blobs_and_certificate(&blobs, &certificate)
829            .await?;
830
831        // Verify the certificate was written
832        let cert_hash = certificate.hash();
833        assert!(storage.contains_certificate(cert_hash).await?);
834
835        // Verify the certificate can be read back
836        let read_certificate = storage.read_certificate(cert_hash).await?;
837        assert!(read_certificate.is_some());
838        assert_eq!(read_certificate.unwrap().hash(), cert_hash);
839
840        // Verify the blobs were written
841        for blob in &blobs {
842            assert!(storage.contains_blob(blob.id()).await?);
843        }
844
845        Ok(())
846    }
847
848    async fn test_storage_event<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
849    where
850        S::Context: Send + Sync,
851    {
852        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
853        let stream_id = StreamId::system("test_stream");
854
855        // Test multiple events
856        let event_id1 = EventId {
857            chain_id,
858            stream_id: stream_id.clone(),
859            index: 0,
860        };
861        let event_id2 = EventId {
862            chain_id,
863            stream_id: stream_id.clone(),
864            index: 1,
865        };
866        let event_id3 = EventId {
867            chain_id,
868            stream_id: stream_id.clone(),
869            index: 2,
870        };
871
872        let event_data1 = vec![1, 2, 3];
873        let event_data2 = vec![4, 5, 6];
874        let event_data3 = vec![7, 8, 9];
875
876        // Test event existence before writing
877        assert!(!storage.contains_event(event_id1.clone()).await?);
878        assert!(!storage.contains_event(event_id2.clone()).await?);
879
880        // Write multiple events
881        storage
882            .write_events([
883                (event_id1.clone(), event_data1.clone()),
884                (event_id2.clone(), event_data2.clone()),
885                (event_id3.clone(), event_data3.clone()),
886            ])
887            .await?;
888
889        // Test event existence after writing
890        assert!(storage.contains_event(event_id1.clone()).await?);
891        assert!(storage.contains_event(event_id2.clone()).await?);
892        assert!(storage.contains_event(event_id3.clone()).await?);
893
894        // Test individual event reading
895        let read_event1 = storage.read_event(event_id1).await?;
896        assert_eq!(read_event1, Some(Arc::new(event_data1)));
897
898        let read_event2 = storage.read_event(event_id2).await?;
899        assert_eq!(read_event2, Some(Arc::new(event_data2)));
900
901        // Test reading events from index
902        let events_from_index = storage
903            .read_events_from_index(&chain_id, &stream_id, 1)
904            .await?;
905        assert!(events_from_index.len() >= 2); // Should contain events at index 1 and 2
906        Ok(())
907    }
908
909    async fn test_storage_network_description<S: Storage + Sync>(
910        storage: &S,
911    ) -> Result<(), ViewError>
912    where
913        S::Context: Send + Sync,
914    {
915        let admin_chain_id = ChainId(CryptoHash::test_hash("test_chain_second"));
916
917        let network_desc = NetworkDescription {
918            name: "test_network".to_string(),
919            genesis_config_hash: CryptoHash::test_hash("genesis_config"),
920            genesis_timestamp: Timestamp::from(0),
921            genesis_committee_blob_hash: CryptoHash::test_hash("committee"),
922            admin_chain_id,
923        };
924
925        // Test reading non-existent network description
926        assert!(storage.read_network_description().await?.is_none());
927
928        // Write network description
929        storage.write_network_description(&network_desc).await?;
930
931        // Test reading existing network description
932        let read_desc = storage.read_network_description().await?;
933        assert_eq!(read_desc, Some(network_desc));
934
935        Ok(())
936    }
937
938    /// Generic test function to test Storage trait features
939    #[test_case(DbStorage::<MemoryDatabase, _>::make_test_storage(None).await; "memory")]
940    #[cfg_attr(feature = "dynamodb", test_case(DbStorage::<DynamoDbDatabase, _>::make_test_storage(None).await; "dynamo_db"))]
941    #[cfg_attr(feature = "scylladb", test_case(DbStorage::<ScyllaDbDatabase, _>::make_test_storage(None).await; "scylla_db"))]
942    #[test_log::test(tokio::test)]
943    async fn test_storage_features<S: Storage + Sync>(storage: S) -> Result<(), ViewError>
944    where
945        S::Context: Send + Sync,
946    {
947        test_storage_chain_exporter(&storage).await?;
948        test_storage_blob(&storage).await?;
949        test_storage_certificate(&storage).await?;
950        test_storage_event(&storage).await?;
951        test_storage_network_description(&storage).await?;
952        Ok(())
953    }
954}