1mod db_storage;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use itertools::Itertools;
12use linera_base::{
13 crypto::CryptoHash,
14 data_types::{
15 ApplicationDescription, Blob, BlockHeight, ChainDescription, CompressedBytecode,
16 NetworkDescription, Timestamp,
17 },
18 identifiers::{ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent, StreamId},
19 vm::VmRuntime,
20};
21pub use linera_cache::DEFAULT_CLEANUP_INTERVAL_SECS;
22use linera_chain::{
23 types::{ConfirmedBlock, ConfirmedBlockCertificate},
24 ChainError, ChainStateView,
25};
26use linera_execution::{
27 committee::Committee, BlobState, ExecutionError, ExecutionRuntimeConfig,
28 ExecutionRuntimeContext, SharedCommittees, TransactionTracker, UserContractCode,
29 UserServiceCode, WasmRuntime,
30};
31#[cfg(with_revm)]
32use linera_execution::{
33 evm::revm::{EvmContractModule, EvmServiceModule},
34 EvmRuntime,
35};
36#[cfg(with_wasm_runtime)]
37use linera_execution::{WasmContractModule, WasmServiceModule};
38use linera_views::{context::Context, views::RootView, ViewError};
39
40#[cfg(with_metrics)]
41pub use crate::db_storage::metrics;
42pub use crate::db_storage::{
43 ChainStatesFirstAssignment, DbStorage, StorageCacheConfig, StorageCaches, WallClock,
44};
45#[cfg(with_testing)]
46pub use crate::db_storage::{TestClock, DEFAULT_STORAGE_CACHE_CONFIG};
47
48pub const DEFAULT_NAMESPACE: &str = "default";
50
51#[cfg_attr(not(web), async_trait)]
53#[cfg_attr(web, async_trait(?Send))]
54pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
55 type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
57
58 type Clock: Clock + Clone + Send + Sync;
60
61 type BlockExporterContext: Context<Extra = u32> + Clone;
63
64 fn clock(&self) -> &Self::Clock;
66
67 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool>;
68
69 async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
77
78 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
80
81 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
83
84 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
86
87 async fn read_confirmed_block(
89 &self,
90 hash: CryptoHash,
91 ) -> Result<Option<Arc<ConfirmedBlock>>, ViewError>;
92
93 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
95 &self,
96 hashes: I,
97 ) -> Result<Vec<Option<Arc<ConfirmedBlock>>>, ViewError>;
98
99 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError>;
101
102 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Arc<Blob>>>, ViewError>;
104
105 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
107
108 async fn read_blob_states(
110 &self,
111 blob_ids: &[BlobId],
112 ) -> Result<Vec<Option<BlobState>>, ViewError>;
113
114 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
116
117 async fn write_blobs_and_certificate(
119 &self,
120 blobs: &[Blob],
121 certificate: &ConfirmedBlockCertificate,
122 ) -> Result<(), ViewError>;
123
124 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
127
128 async fn maybe_write_blob_states(
130 &self,
131 blob_ids: &[BlobId],
132 blob_state: BlobState,
133 ) -> Result<(), ViewError>;
134
135 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
137
138 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
140
141 fn cache_certificate(
148 &self,
149 certificate: ConfirmedBlockCertificate,
150 ) -> Arc<ConfirmedBlockCertificate>;
151
152 fn cache_blob(&self, blob: Blob) -> Arc<Blob>;
159
160 fn cache_confirmed_block(&self, block: ConfirmedBlock) -> Arc<ConfirmedBlock>;
167
168 async fn read_certificate(
170 &self,
171 hash: CryptoHash,
172 ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError>;
173
174 async fn read_certificates(
176 &self,
177 hashes: &[CryptoHash],
178 ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError>;
179
180 async fn read_certificates_raw(
186 &self,
187 hashes: &[CryptoHash],
188 ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError>;
189
190 async fn read_certificates_by_heights(
194 &self,
195 chain_id: ChainId,
196 heights: &[BlockHeight],
197 ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError>;
198
199 async fn read_certificates_by_heights_raw(
204 &self,
205 chain_id: ChainId,
206 heights: &[BlockHeight],
207 ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError>;
208
209 async fn read_certificate_hashes_by_heights(
213 &self,
214 chain_id: ChainId,
215 heights: &[BlockHeight],
216 ) -> Result<Vec<Option<CryptoHash>>, ViewError>;
217
218 async fn read_event_block_heights(
221 &self,
222 event_ids: &[EventId],
223 ) -> Result<Vec<Option<BlockHeight>>, ViewError>;
224
225 async fn read_event(&self, id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError>;
227
228 async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
230
231 async fn read_events_from_index(
233 &self,
234 chain_id: &ChainId,
235 stream_id: &StreamId,
236 start_index: u32,
237 ) -> Result<Vec<IndexAndEvent>, ViewError>;
238
239 async fn write_events(
241 &self,
242 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
243 ) -> Result<(), ViewError>;
244
245 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
247
248 async fn write_network_description(
250 &self,
251 information: &NetworkDescription,
252 ) -> Result<(), ViewError>;
253
254 async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
262 where
263 ChainRuntimeContext<Self>: ExecutionRuntimeContext,
264 {
265 let id = description.id();
266 self.write_blob(&Blob::new_chain_description(&description))
268 .await?;
269 let mut chain = self.load_chain(id).await?;
270 assert!(
271 !chain.is_active().await?,
272 "Attempting to create a chain twice"
273 );
274 let current_time = self.clock().current_time();
275 chain.initialize_if_needed(current_time).await?;
276 chain.save().await?;
277 Ok(())
278 }
279
280 fn wasm_runtime(&self) -> Option<WasmRuntime>;
282
283 async fn load_contract(
286 &self,
287 application_description: &ApplicationDescription,
288 txn_tracker: &TransactionTracker,
289 ) -> Result<UserContractCode, ExecutionError> {
290 let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
291 let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
292 Some(content) => content.clone(),
293 None => self
294 .read_blob(contract_bytecode_blob_id)
295 .await?
296 .ok_or(ExecutionError::BlobsNotFound(vec![
297 contract_bytecode_blob_id,
298 ]))?
299 .content()
300 .clone(),
301 };
302 let compressed_contract_bytecode = CompressedBytecode {
303 compressed_bytes: content.into_arc_bytes(),
304 };
305 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
306 let contract_bytecode = self
307 .thread_pool()
308 .run_send((), move |()| async move {
309 compressed_contract_bytecode.decompress()
310 })
311 .await
312 .await??;
313 match application_description.module_id.vm_runtime {
314 VmRuntime::Wasm => {
315 cfg_if::cfg_if! {
316 if #[cfg(with_wasm_runtime)] {
317 let Some(wasm_runtime) = self.wasm_runtime() else {
318 panic!("A Wasm runtime is required to load user applications.");
319 };
320 Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
321 .await?
322 .into())
323 } else {
324 panic!(
325 "A Wasm runtime is required to load user applications. \
326 Please enable the `wasmer` or the `wasmtime` feature flags \
327 when compiling `linera-storage`."
328 );
329 }
330 }
331 }
332 VmRuntime::Evm => {
333 cfg_if::cfg_if! {
334 if #[cfg(with_revm)] {
335 let evm_runtime = EvmRuntime::Revm;
336 Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
337 .into())
338 } else {
339 panic!(
340 "An Evm runtime is required to load user applications. \
341 Please enable the `revm` feature flag \
342 when compiling `linera-storage`."
343 );
344 }
345 }
346 }
347 }
348 }
349
350 async fn load_service(
353 &self,
354 application_description: &ApplicationDescription,
355 txn_tracker: &TransactionTracker,
356 ) -> Result<UserServiceCode, ExecutionError> {
357 let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
358 let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
359 Some(content) => content.clone(),
360 None => self
361 .read_blob(service_bytecode_blob_id)
362 .await?
363 .ok_or(ExecutionError::BlobsNotFound(vec![
364 service_bytecode_blob_id,
365 ]))?
366 .content()
367 .clone(),
368 };
369 let compressed_service_bytecode = CompressedBytecode {
370 compressed_bytes: content.into_arc_bytes(),
371 };
372 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
373 let service_bytecode = self
374 .thread_pool()
375 .run_send((), move |()| async move {
376 compressed_service_bytecode.decompress()
377 })
378 .await
379 .await??;
380 match application_description.module_id.vm_runtime {
381 VmRuntime::Wasm => {
382 cfg_if::cfg_if! {
383 if #[cfg(with_wasm_runtime)] {
384 let Some(wasm_runtime) = self.wasm_runtime() else {
385 panic!("A Wasm runtime is required to load user applications.");
386 };
387 Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
388 .await?
389 .into())
390 } else {
391 panic!(
392 "A Wasm runtime is required to load user applications. \
393 Please enable the `wasmer` or the `wasmtime` feature flags \
394 when compiling `linera-storage`."
395 );
396 }
397 }
398 }
399 VmRuntime::Evm => {
400 cfg_if::cfg_if! {
401 if #[cfg(with_revm)] {
402 let evm_runtime = EvmRuntime::Revm;
403 Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
404 .into())
405 } else {
406 panic!(
407 "An Evm runtime is required to load user applications. \
408 Please enable the `revm` feature flag \
409 when compiling `linera-storage`."
410 );
411 }
412 }
413 }
414 }
415 }
416
417 async fn block_exporter_context(
418 &self,
419 block_exporter_id: u32,
420 ) -> Result<Self::BlockExporterContext, ViewError>;
421
422 fn shared_committees(&self) -> &SharedCommittees;
424
425 async fn get_or_load_committee_by_hash(
428 &self,
429 hash: CryptoHash,
430 ) -> Result<Arc<Committee>, ExecutionError> {
431 if let Some(committee) = self.shared_committees().get(hash) {
432 return Ok(committee);
433 }
434 let blob_id = BlobId::new(hash, BlobType::Committee);
435 let blob = self
436 .read_blob(blob_id)
437 .await?
438 .ok_or(ExecutionError::BlobsNotFound(vec![blob_id]))?;
439 let committee = bcs::from_bytes(blob.bytes())?;
440 Ok(self.shared_committees().insert(hash, Arc::new(committee)))
441 }
442
443 async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError>;
445
446 async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError>;
448
449 async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError>;
451}
452
453pub enum ResultReadCertificates {
455 Certificates(Vec<ConfirmedBlockCertificate>),
456 InvalidHashes(Vec<CryptoHash>),
457}
458
459impl ResultReadCertificates {
460 pub fn new(
462 certificates: Vec<Option<Arc<ConfirmedBlockCertificate>>>,
463 hashes: Vec<CryptoHash>,
464 ) -> Self {
465 let (certificates, invalid_hashes) = certificates
466 .into_iter()
467 .zip(hashes)
468 .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
469 Some(cert) => itertools::Either::Left(Arc::unwrap_or_clone(cert)),
470 None => itertools::Either::Right(hash),
471 });
472 if invalid_hashes.is_empty() {
473 Self::Certificates(certificates)
474 } else {
475 Self::InvalidHashes(invalid_hashes)
476 }
477 }
478}
479
480#[derive(Clone)]
482pub struct ChainRuntimeContext<S> {
483 storage: S,
484 chain_id: ChainId,
485 thread_pool: Arc<linera_execution::ThreadPool>,
486 execution_runtime_config: ExecutionRuntimeConfig,
487 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
488 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
489}
490
491#[cfg_attr(not(web), async_trait)]
492#[cfg_attr(web, async_trait(?Send))]
493impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
494 fn chain_id(&self) -> ChainId {
495 self.chain_id
496 }
497
498 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
499 &self.thread_pool
500 }
501
502 fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
503 self.execution_runtime_config
504 }
505
506 fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
507 &self.user_contracts
508 }
509
510 fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
511 &self.user_services
512 }
513
514 async fn get_user_contract(
515 &self,
516 description: &ApplicationDescription,
517 txn_tracker: &TransactionTracker,
518 ) -> Result<UserContractCode, ExecutionError> {
519 let application_id = description.into();
520 let pinned = self.user_contracts.pin_owned();
521 if let Some(contract) = pinned.get(&application_id) {
522 return Ok(contract.clone());
523 }
524 let contract = self.storage.load_contract(description, txn_tracker).await?;
525 pinned.insert(application_id, contract.clone());
526 Ok(contract)
527 }
528
529 async fn get_user_service(
530 &self,
531 description: &ApplicationDescription,
532 txn_tracker: &TransactionTracker,
533 ) -> Result<UserServiceCode, ExecutionError> {
534 let application_id = description.into();
535 let pinned = self.user_services.pin_owned();
536 if let Some(service) = pinned.get(&application_id) {
537 return Ok(service.clone());
538 }
539 let service = self.storage.load_service(description, txn_tracker).await?;
540 pinned.insert(application_id, service.clone());
541 Ok(service)
542 }
543
544 async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError> {
545 self.storage.read_blob(blob_id).await
546 }
547
548 async fn get_event(&self, event_id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError> {
549 self.storage.read_event(event_id).await
550 }
551
552 async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
553 self.storage.read_network_description().await
554 }
555
556 async fn get_or_load_committee_by_hash(
557 &self,
558 hash: CryptoHash,
559 ) -> Result<Arc<Committee>, ExecutionError> {
560 self.storage.get_or_load_committee_by_hash(hash).await
561 }
562
563 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
564 self.storage.contains_blob(blob_id).await
565 }
566
567 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
568 self.storage.contains_event(event_id).await
569 }
570
571 #[cfg(with_testing)]
572 async fn add_blobs(
573 &self,
574 blobs: impl IntoIterator<Item = Blob> + Send,
575 ) -> Result<(), ViewError> {
576 let blobs = Vec::from_iter(blobs);
577 self.storage.write_blobs(&blobs).await
578 }
579
580 #[cfg(with_testing)]
581 async fn add_events(
582 &self,
583 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
584 ) -> Result<(), ViewError> {
585 self.storage.write_events(events).await
586 }
587}
588
589#[cfg_attr(not(web), async_trait)]
591#[cfg_attr(web, async_trait(?Send))]
592pub trait Clock {
593 fn current_time(&self) -> Timestamp;
594
595 async fn sleep_until(&self, timestamp: Timestamp);
596}
597
598#[cfg(test)]
599mod tests {
600 use std::collections::BTreeMap;
601
602 use linera_base::{
603 crypto::{AccountPublicKey, CryptoHash},
604 data_types::{
605 Amount, ApplicationPermissions, Blob, BlockHeight, ChainDescription, ChainOrigin,
606 Epoch, InitialChainConfig, NetworkDescription, Round, Timestamp,
607 },
608 identifiers::{BlobId, BlobType, ChainId, EventId, StreamId},
609 ownership::ChainOwnership,
610 };
611 use linera_chain::{
612 block::{Block, ConfirmedBlock},
613 data_types::{BlockExecutionOutcome, ProposedBlock},
614 };
615 use linera_execution::BlobState;
616 #[cfg(feature = "dynamodb")]
617 use linera_views::dynamo_db::DynamoDbDatabase;
618 #[cfg(feature = "scylladb")]
619 use linera_views::scylla_db::ScyllaDbDatabase;
620 use linera_views::{memory::MemoryDatabase, ViewError};
621 use test_case::test_case;
622
623 use super::*;
624 use crate::db_storage::DbStorage;
625
626 async fn test_storage_chain_exporter<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
628 where
629 S::Context: Send + Sync,
630 {
631 let _current_time = storage.clock().current_time();
633 let test_chain_id = ChainId(CryptoHash::test_hash("test_chain"));
634
635 let _chain_view = storage.load_chain(test_chain_id).await?;
637
638 let _block_exporter_context = storage.block_exporter_context(0).await?;
640 Ok(())
641 }
642
643 async fn test_storage_blob<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
644 where
645 S::Context: Send + Sync,
646 {
647 let chain_description = ChainDescription::new(
649 ChainOrigin::Root(0),
650 InitialChainConfig {
651 ownership: ChainOwnership::single(AccountPublicKey::test_key(0).into()),
652 epoch: Epoch::ZERO,
653 min_active_epoch: Epoch::ZERO,
654 max_active_epoch: Epoch::ZERO,
655 balance: Amount::ZERO,
656 application_permissions: ApplicationPermissions::default(),
657 },
658 Timestamp::from(0),
659 );
660
661 let test_blob1 = Blob::new_chain_description(&chain_description);
662 let test_blob2 = Blob::new_data(vec![10, 20, 30]);
663 let test_blob3 = Blob::new_data(vec![40, 50, 60]);
664
665 let blob_id1 = test_blob1.id();
667 let blob_id2 = test_blob2.id();
668 let blob_id3 = test_blob3.id();
669
670 assert!(!storage.contains_blob(blob_id1).await?);
672 assert!(!storage.contains_blob(blob_id2).await?);
673 assert!(!storage.contains_blob(blob_id3).await?);
674
675 storage.write_blob(&test_blob1).await?;
677 assert!(storage.contains_blob(blob_id1).await?);
678
679 storage
681 .write_blobs(&[test_blob2.clone(), test_blob3.clone()])
682 .await?;
683 assert!(storage.contains_blob(blob_id2).await?);
684 assert!(storage.contains_blob(blob_id3).await?);
685
686 let read_blob = storage.read_blob(blob_id1).await?;
688 assert_eq!(read_blob.as_deref(), Some(&test_blob1));
689
690 let blob_ids = vec![blob_id1, blob_id2, blob_id3];
692 let read_blobs = storage.read_blobs(&blob_ids).await?;
693 assert_eq!(read_blobs.len(), 3);
694
695 assert_eq!(read_blobs[0].as_deref(), Some(&test_blob1));
697 assert_eq!(read_blobs[1].as_deref(), Some(&test_blob2));
698 assert_eq!(read_blobs[2].as_deref(), Some(&test_blob3));
699
700 let missing_blob_id = BlobId::new(CryptoHash::test_hash("missing"), BlobType::Data);
702 let missing_blobs = storage.missing_blobs(&[blob_id1, missing_blob_id]).await?;
703 assert_eq!(missing_blobs, vec![missing_blob_id]);
704
705 let write_results = storage
707 .maybe_write_blobs(std::slice::from_ref(&test_blob1))
708 .await?;
709 assert_eq!(write_results, vec![false]);
710
711 let blob_state1 = BlobState {
713 last_used_by: None,
714 chain_id: ChainId(CryptoHash::test_hash("chain1")),
715 block_height: BlockHeight(0),
716 epoch: Some(Epoch::ZERO),
717 };
718 let blob_state2 = BlobState {
719 last_used_by: Some(CryptoHash::test_hash("cert")),
720 chain_id: ChainId(CryptoHash::test_hash("chain2")),
721 block_height: BlockHeight(1),
722 epoch: Some(Epoch::from(1)),
723 };
724
725 assert!(!storage.contains_blob_state(blob_id1).await?);
727 assert!(!storage.contains_blob_state(blob_id2).await?);
728
729 storage
731 .maybe_write_blob_states(&[blob_id1], blob_state1.clone())
732 .await?;
733 storage
734 .maybe_write_blob_states(&[blob_id2], blob_state2.clone())
735 .await?;
736
737 assert!(storage.contains_blob_state(blob_id1).await?);
739 assert!(storage.contains_blob_state(blob_id2).await?);
740
741 let read_blob_state = storage.read_blob_state(blob_id1).await?;
743 assert_eq!(read_blob_state, Some(blob_state1.clone()));
744
745 let read_blob_states = storage.read_blob_states(&[blob_id1, blob_id2]).await?;
747 assert_eq!(read_blob_states.len(), 2);
748
749 assert_eq!(read_blob_states[0], Some(blob_state1));
751 assert_eq!(read_blob_states[1], Some(blob_state2));
752
753 let write_results = storage
755 .maybe_write_blobs(std::slice::from_ref(&test_blob1))
756 .await?;
757 assert_eq!(write_results, vec![true]);
758
759 Ok(())
760 }
761
762 async fn test_storage_certificate<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
763 where
764 S::Context: Send + Sync,
765 {
766 let cert_hash = CryptoHash::test_hash("certificate");
767
768 assert!(!storage.contains_certificate(cert_hash).await?);
770
771 assert!(storage.read_certificate(cert_hash).await?.is_none());
773
774 let cert_hashes = vec![cert_hash, CryptoHash::test_hash("cert2")];
776 let certs_result = storage.read_certificates(&cert_hashes).await?;
777 assert_eq!(certs_result.len(), 2);
778 assert!(certs_result[0].is_none());
779 assert!(certs_result[1].is_none());
780
781 let raw_certs_result = storage.read_certificates_raw(&cert_hashes).await?;
783 assert!(raw_certs_result.iter().all(|cert| cert.is_none())); let block_hash = CryptoHash::test_hash("block");
787 let block_result = storage.read_confirmed_block(block_hash).await?;
788 assert!(block_result.is_none());
789
790 let test_blob1 = Blob::new_data(vec![1, 2, 3]);
793 let test_blob2 = Blob::new_data(vec![4, 5, 6]);
794 let blobs = vec![test_blob1, test_blob2];
795
796 let chain_id = ChainId(CryptoHash::test_hash("test_chain_cert"));
798
799 let proposed_block = ProposedBlock {
801 epoch: Epoch::ZERO,
802 chain_id,
803 transactions: vec![],
804 previous_block_hash: None,
805 height: BlockHeight::ZERO,
806 authenticated_owner: None,
807 timestamp: Timestamp::default(),
808 };
809
810 let outcome = BlockExecutionOutcome {
812 messages: vec![],
813 state_hash: CryptoHash::default(),
814 oracle_responses: vec![],
815 events: vec![],
816 blobs: vec![],
817 operation_results: vec![],
818 previous_event_blocks: BTreeMap::new(),
819 previous_message_blocks: BTreeMap::new(),
820 };
821
822 let block = Block::new(proposed_block, outcome);
823 let confirmed_block = ConfirmedBlock::new(block);
824 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
825
826 storage
828 .write_blobs_and_certificate(&blobs, &certificate)
829 .await?;
830
831 let cert_hash = certificate.hash();
833 assert!(storage.contains_certificate(cert_hash).await?);
834
835 let read_certificate = storage.read_certificate(cert_hash).await?;
837 assert!(read_certificate.is_some());
838 assert_eq!(read_certificate.unwrap().hash(), cert_hash);
839
840 for blob in &blobs {
842 assert!(storage.contains_blob(blob.id()).await?);
843 }
844
845 Ok(())
846 }
847
848 async fn test_storage_event<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
849 where
850 S::Context: Send + Sync,
851 {
852 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
853 let stream_id = StreamId::system("test_stream");
854
855 let event_id1 = EventId {
857 chain_id,
858 stream_id: stream_id.clone(),
859 index: 0,
860 };
861 let event_id2 = EventId {
862 chain_id,
863 stream_id: stream_id.clone(),
864 index: 1,
865 };
866 let event_id3 = EventId {
867 chain_id,
868 stream_id: stream_id.clone(),
869 index: 2,
870 };
871
872 let event_data1 = vec![1, 2, 3];
873 let event_data2 = vec![4, 5, 6];
874 let event_data3 = vec![7, 8, 9];
875
876 assert!(!storage.contains_event(event_id1.clone()).await?);
878 assert!(!storage.contains_event(event_id2.clone()).await?);
879
880 storage
882 .write_events([
883 (event_id1.clone(), event_data1.clone()),
884 (event_id2.clone(), event_data2.clone()),
885 (event_id3.clone(), event_data3.clone()),
886 ])
887 .await?;
888
889 assert!(storage.contains_event(event_id1.clone()).await?);
891 assert!(storage.contains_event(event_id2.clone()).await?);
892 assert!(storage.contains_event(event_id3.clone()).await?);
893
894 let read_event1 = storage.read_event(event_id1).await?;
896 assert_eq!(read_event1, Some(Arc::new(event_data1)));
897
898 let read_event2 = storage.read_event(event_id2).await?;
899 assert_eq!(read_event2, Some(Arc::new(event_data2)));
900
901 let events_from_index = storage
903 .read_events_from_index(&chain_id, &stream_id, 1)
904 .await?;
905 assert!(events_from_index.len() >= 2); Ok(())
907 }
908
909 async fn test_storage_network_description<S: Storage + Sync>(
910 storage: &S,
911 ) -> Result<(), ViewError>
912 where
913 S::Context: Send + Sync,
914 {
915 let admin_chain_id = ChainId(CryptoHash::test_hash("test_chain_second"));
916
917 let network_desc = NetworkDescription {
918 name: "test_network".to_string(),
919 genesis_config_hash: CryptoHash::test_hash("genesis_config"),
920 genesis_timestamp: Timestamp::from(0),
921 genesis_committee_blob_hash: CryptoHash::test_hash("committee"),
922 admin_chain_id,
923 };
924
925 assert!(storage.read_network_description().await?.is_none());
927
928 storage.write_network_description(&network_desc).await?;
930
931 let read_desc = storage.read_network_description().await?;
933 assert_eq!(read_desc, Some(network_desc));
934
935 Ok(())
936 }
937
938 #[test_case(DbStorage::<MemoryDatabase, _>::make_test_storage(None).await; "memory")]
940 #[cfg_attr(feature = "dynamodb", test_case(DbStorage::<DynamoDbDatabase, _>::make_test_storage(None).await; "dynamo_db"))]
941 #[cfg_attr(feature = "scylladb", test_case(DbStorage::<ScyllaDbDatabase, _>::make_test_storage(None).await; "scylla_db"))]
942 #[test_log::test(tokio::test)]
943 async fn test_storage_features<S: Storage + Sync>(storage: S) -> Result<(), ViewError>
944 where
945 S::Context: Send + Sync,
946 {
947 test_storage_chain_exporter(&storage).await?;
948 test_storage_blob(&storage).await?;
949 test_storage_certificate(&storage).await?;
950 test_storage_event(&storage).await?;
951 test_storage_network_description(&storage).await?;
952 Ok(())
953 }
954}