1mod db_storage;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use itertools::Itertools;
12use linera_base::{
13 crypto::CryptoHash,
14 data_types::{
15 ApplicationDescription, Blob, BlockHeight, ChainDescription, CompressedBytecode,
16 NetworkDescription, TimeDelta, Timestamp,
17 },
18 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
19 vm::VmRuntime,
20};
21use linera_chain::{
22 types::{ConfirmedBlock, ConfirmedBlockCertificate},
23 ChainError, ChainStateView,
24};
25#[cfg(with_revm)]
26use linera_execution::{
27 evm::revm::{EvmContractModule, EvmServiceModule},
28 EvmRuntime,
29};
30use linera_execution::{
31 BlobState, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, TransactionTracker,
32 UserContractCode, UserServiceCode, WasmRuntime,
33};
34#[cfg(with_wasm_runtime)]
35use linera_execution::{WasmContractModule, WasmServiceModule};
36use linera_views::{context::Context, views::RootView, ViewError};
37
38#[cfg(with_metrics)]
39pub use crate::db_storage::metrics;
40#[cfg(with_testing)]
41pub use crate::db_storage::TestClock;
42pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
43
44pub const DEFAULT_NAMESPACE: &str = "default";
46
47#[cfg_attr(not(web), async_trait)]
49#[cfg_attr(web, async_trait(?Send))]
50pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
51 type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
53
54 type Clock: Clock + Clone + Send + Sync;
56
57 type BlockExporterContext: Context<Extra = u32> + Clone;
59
60 fn clock(&self) -> &Self::Clock;
62
63 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool>;
64
65 async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
73
74 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
76
77 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
79
80 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
82
83 async fn read_confirmed_block(
85 &self,
86 hash: CryptoHash,
87 ) -> Result<Option<ConfirmedBlock>, ViewError>;
88
89 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
91 &self,
92 hashes: I,
93 ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError>;
94
95 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
97
98 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
100
101 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
103
104 async fn read_blob_states(
106 &self,
107 blob_ids: &[BlobId],
108 ) -> Result<Vec<Option<BlobState>>, ViewError>;
109
110 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
112
113 async fn write_blobs_and_certificate(
115 &self,
116 blobs: &[Blob],
117 certificate: &ConfirmedBlockCertificate,
118 ) -> Result<(), ViewError>;
119
120 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
123
124 async fn maybe_write_blob_states(
126 &self,
127 blob_ids: &[BlobId],
128 blob_state: BlobState,
129 ) -> Result<(), ViewError>;
130
131 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
133
134 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
136
137 async fn read_certificate(
139 &self,
140 hash: CryptoHash,
141 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
142
143 async fn read_certificates(
145 &self,
146 hashes: &[CryptoHash],
147 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
148
149 async fn read_certificates_raw(
155 &self,
156 hashes: &[CryptoHash],
157 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError>;
158
159 async fn read_certificates_by_heights(
163 &self,
164 chain_id: ChainId,
165 heights: &[BlockHeight],
166 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
167
168 async fn read_certificates_by_heights_raw(
173 &self,
174 chain_id: ChainId,
175 heights: &[BlockHeight],
176 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError>;
177
178 async fn read_certificate_hashes_by_heights(
182 &self,
183 chain_id: ChainId,
184 heights: &[BlockHeight],
185 ) -> Result<Vec<Option<CryptoHash>>, ViewError>;
186
187 async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
189
190 async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
192
193 async fn read_events_from_index(
195 &self,
196 chain_id: &ChainId,
197 stream_id: &StreamId,
198 start_index: u32,
199 ) -> Result<Vec<IndexAndEvent>, ViewError>;
200
201 async fn write_events(
203 &self,
204 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
205 ) -> Result<(), ViewError>;
206
207 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
209
210 async fn write_network_description(
212 &self,
213 information: &NetworkDescription,
214 ) -> Result<(), ViewError>;
215
216 async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
224 where
225 ChainRuntimeContext<Self>: ExecutionRuntimeContext,
226 {
227 let id = description.id();
228 self.write_blob(&Blob::new_chain_description(&description))
230 .await?;
231 let mut chain = self.load_chain(id).await?;
232 assert!(!chain.is_active(), "Attempting to create a chain twice");
233 let current_time = self.clock().current_time();
234 chain.initialize_if_needed(current_time).await?;
235 chain.save().await?;
236 Ok(())
237 }
238
239 fn wasm_runtime(&self) -> Option<WasmRuntime>;
241
242 async fn load_contract(
245 &self,
246 application_description: &ApplicationDescription,
247 txn_tracker: &TransactionTracker,
248 ) -> Result<UserContractCode, ExecutionError> {
249 let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
250 let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
251 Some(content) => content.clone(),
252 None => self
253 .read_blob(contract_bytecode_blob_id)
254 .await?
255 .ok_or(ExecutionError::BlobsNotFound(vec![
256 contract_bytecode_blob_id,
257 ]))?
258 .into_content(),
259 };
260 let compressed_contract_bytecode = CompressedBytecode {
261 compressed_bytes: content.into_arc_bytes(),
262 };
263 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
264 let contract_bytecode = self
265 .thread_pool()
266 .run_send((), move |()| async move {
267 compressed_contract_bytecode.decompress()
268 })
269 .await
270 .await??;
271 match application_description.module_id.vm_runtime {
272 VmRuntime::Wasm => {
273 cfg_if::cfg_if! {
274 if #[cfg(with_wasm_runtime)] {
275 let Some(wasm_runtime) = self.wasm_runtime() else {
276 panic!("A Wasm runtime is required to load user applications.");
277 };
278 Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
279 .await?
280 .into())
281 } else {
282 panic!(
283 "A Wasm runtime is required to load user applications. \
284 Please enable the `wasmer` or the `wasmtime` feature flags \
285 when compiling `linera-storage`."
286 );
287 }
288 }
289 }
290 VmRuntime::Evm => {
291 cfg_if::cfg_if! {
292 if #[cfg(with_revm)] {
293 let evm_runtime = EvmRuntime::Revm;
294 Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
295 .into())
296 } else {
297 panic!(
298 "An Evm runtime is required to load user applications. \
299 Please enable the `revm` feature flag \
300 when compiling `linera-storage`."
301 );
302 }
303 }
304 }
305 }
306 }
307
308 async fn load_service(
311 &self,
312 application_description: &ApplicationDescription,
313 txn_tracker: &TransactionTracker,
314 ) -> Result<UserServiceCode, ExecutionError> {
315 let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
316 let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
317 Some(content) => content.clone(),
318 None => self
319 .read_blob(service_bytecode_blob_id)
320 .await?
321 .ok_or(ExecutionError::BlobsNotFound(vec![
322 service_bytecode_blob_id,
323 ]))?
324 .into_content(),
325 };
326 let compressed_service_bytecode = CompressedBytecode {
327 compressed_bytes: content.into_arc_bytes(),
328 };
329 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
330 let service_bytecode = self
331 .thread_pool()
332 .run_send((), move |()| async move {
333 compressed_service_bytecode.decompress()
334 })
335 .await
336 .await??;
337 match application_description.module_id.vm_runtime {
338 VmRuntime::Wasm => {
339 cfg_if::cfg_if! {
340 if #[cfg(with_wasm_runtime)] {
341 let Some(wasm_runtime) = self.wasm_runtime() else {
342 panic!("A Wasm runtime is required to load user applications.");
343 };
344 Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
345 .await?
346 .into())
347 } else {
348 panic!(
349 "A Wasm runtime is required to load user applications. \
350 Please enable the `wasmer` or the `wasmtime` feature flags \
351 when compiling `linera-storage`."
352 );
353 }
354 }
355 }
356 VmRuntime::Evm => {
357 cfg_if::cfg_if! {
358 if #[cfg(with_revm)] {
359 let evm_runtime = EvmRuntime::Revm;
360 Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
361 .into())
362 } else {
363 panic!(
364 "An Evm runtime is required to load user applications. \
365 Please enable the `revm` feature flag \
366 when compiling `linera-storage`."
367 );
368 }
369 }
370 }
371 }
372 }
373
374 async fn block_exporter_context(
375 &self,
376 block_exporter_id: u32,
377 ) -> Result<Self::BlockExporterContext, ViewError>;
378
379 async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError>;
381
382 async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError>;
384
385 async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError>;
387}
388
389pub enum ResultReadCertificates {
391 Certificates(Vec<ConfirmedBlockCertificate>),
392 InvalidHashes(Vec<CryptoHash>),
393}
394
395impl ResultReadCertificates {
396 pub fn new(
398 certificates: Vec<Option<ConfirmedBlockCertificate>>,
399 hashes: Vec<CryptoHash>,
400 ) -> Self {
401 let (certificates, invalid_hashes) = certificates
402 .into_iter()
403 .zip(hashes)
404 .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
405 Some(cert) => itertools::Either::Left(cert),
406 None => itertools::Either::Right(hash),
407 });
408 if invalid_hashes.is_empty() {
409 Self::Certificates(certificates)
410 } else {
411 Self::InvalidHashes(invalid_hashes)
412 }
413 }
414}
415
416pub enum ResultReadConfirmedBlocks {
418 Blocks(Vec<ConfirmedBlock>),
419 InvalidHashes(Vec<CryptoHash>),
420}
421
422impl ResultReadConfirmedBlocks {
423 pub fn new(blocks: Vec<Option<ConfirmedBlock>>, hashes: Vec<CryptoHash>) -> Self {
425 let (blocks, invalid_hashes) = blocks
426 .into_iter()
427 .zip(hashes)
428 .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(block, hash)| match block {
429 Some(block) => itertools::Either::Left(block),
430 None => itertools::Either::Right(hash),
431 });
432 if invalid_hashes.is_empty() {
433 Self::Blocks(blocks)
434 } else {
435 Self::InvalidHashes(invalid_hashes)
436 }
437 }
438}
439
440#[derive(Clone)]
442pub struct ChainRuntimeContext<S> {
443 storage: S,
444 chain_id: ChainId,
445 thread_pool: Arc<linera_execution::ThreadPool>,
446 execution_runtime_config: ExecutionRuntimeConfig,
447 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
448 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
449}
450
451#[cfg_attr(not(web), async_trait)]
452#[cfg_attr(web, async_trait(?Send))]
453impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
454 fn chain_id(&self) -> ChainId {
455 self.chain_id
456 }
457
458 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
459 &self.thread_pool
460 }
461
462 fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
463 self.execution_runtime_config
464 }
465
466 fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
467 &self.user_contracts
468 }
469
470 fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
471 &self.user_services
472 }
473
474 async fn get_user_contract(
475 &self,
476 description: &ApplicationDescription,
477 txn_tracker: &TransactionTracker,
478 ) -> Result<UserContractCode, ExecutionError> {
479 let application_id = description.into();
480 let pinned = self.user_contracts.pin_owned();
481 if let Some(contract) = pinned.get(&application_id) {
482 return Ok(contract.clone());
483 }
484 let contract = self.storage.load_contract(description, txn_tracker).await?;
485 pinned.insert(application_id, contract.clone());
486 Ok(contract)
487 }
488
489 async fn get_user_service(
490 &self,
491 description: &ApplicationDescription,
492 txn_tracker: &TransactionTracker,
493 ) -> Result<UserServiceCode, ExecutionError> {
494 let application_id = description.into();
495 let pinned = self.user_services.pin_owned();
496 if let Some(service) = pinned.get(&application_id) {
497 return Ok(service.clone());
498 }
499 let service = self.storage.load_service(description, txn_tracker).await?;
500 pinned.insert(application_id, service.clone());
501 Ok(service)
502 }
503
504 async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
505 self.storage.read_blob(blob_id).await
506 }
507
508 async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
509 self.storage.read_event(event_id).await
510 }
511
512 async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
513 self.storage.read_network_description().await
514 }
515
516 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
517 self.storage.contains_blob(blob_id).await
518 }
519
520 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
521 self.storage.contains_event(event_id).await
522 }
523
524 #[cfg(with_testing)]
525 async fn add_blobs(
526 &self,
527 blobs: impl IntoIterator<Item = Blob> + Send,
528 ) -> Result<(), ViewError> {
529 let blobs = Vec::from_iter(blobs);
530 self.storage.write_blobs(&blobs).await
531 }
532
533 #[cfg(with_testing)]
534 async fn add_events(
535 &self,
536 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
537 ) -> Result<(), ViewError> {
538 self.storage.write_events(events).await
539 }
540}
541
542#[cfg_attr(not(web), async_trait)]
544#[cfg_attr(web, async_trait(?Send))]
545pub trait Clock {
546 fn current_time(&self) -> Timestamp;
547
548 async fn sleep(&self, delta: TimeDelta);
549
550 async fn sleep_until(&self, timestamp: Timestamp);
551}
552
553#[cfg(test)]
554mod tests {
555 use std::collections::BTreeMap;
556
557 use linera_base::{
558 crypto::{AccountPublicKey, CryptoHash},
559 data_types::{
560 Amount, ApplicationPermissions, Blob, BlockHeight, ChainDescription, ChainOrigin,
561 Epoch, InitialChainConfig, NetworkDescription, Round, Timestamp,
562 },
563 identifiers::{BlobId, BlobType, ChainId, EventId, StreamId},
564 ownership::ChainOwnership,
565 };
566 use linera_chain::{
567 block::{Block, ConfirmedBlock},
568 data_types::{BlockExecutionOutcome, ProposedBlock},
569 };
570 use linera_execution::BlobState;
571 #[cfg(feature = "dynamodb")]
572 use linera_views::dynamo_db::DynamoDbDatabase;
573 #[cfg(feature = "scylladb")]
574 use linera_views::scylla_db::ScyllaDbDatabase;
575 use linera_views::{memory::MemoryDatabase, ViewError};
576 use test_case::test_case;
577
578 use super::*;
579 use crate::db_storage::DbStorage;
580
581 async fn test_storage_chain_exporter<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
583 where
584 S::Context: Send + Sync,
585 {
586 let _current_time = storage.clock().current_time();
588 let test_chain_id = ChainId(CryptoHash::test_hash("test_chain"));
589
590 let _chain_view = storage.load_chain(test_chain_id).await?;
592
593 let _block_exporter_context = storage.block_exporter_context(0).await?;
595 Ok(())
596 }
597
598 async fn test_storage_blob<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
599 where
600 S::Context: Send + Sync,
601 {
602 let chain_description = ChainDescription::new(
604 ChainOrigin::Root(0),
605 InitialChainConfig {
606 ownership: ChainOwnership::single(AccountPublicKey::test_key(0).into()),
607 epoch: Epoch::ZERO,
608 min_active_epoch: Epoch::ZERO,
609 max_active_epoch: Epoch::ZERO,
610 balance: Amount::ZERO,
611 application_permissions: ApplicationPermissions::default(),
612 },
613 Timestamp::from(0),
614 );
615
616 let test_blob1 = Blob::new_chain_description(&chain_description);
617 let test_blob2 = Blob::new_data(vec![10, 20, 30]);
618 let test_blob3 = Blob::new_data(vec![40, 50, 60]);
619
620 let blob_id1 = test_blob1.id();
622 let blob_id2 = test_blob2.id();
623 let blob_id3 = test_blob3.id();
624
625 assert!(!storage.contains_blob(blob_id1).await?);
627 assert!(!storage.contains_blob(blob_id2).await?);
628 assert!(!storage.contains_blob(blob_id3).await?);
629
630 storage.write_blob(&test_blob1).await?;
632 assert!(storage.contains_blob(blob_id1).await?);
633
634 storage
636 .write_blobs(&[test_blob2.clone(), test_blob3.clone()])
637 .await?;
638 assert!(storage.contains_blob(blob_id2).await?);
639 assert!(storage.contains_blob(blob_id3).await?);
640
641 let read_blob = storage.read_blob(blob_id1).await?;
643 assert_eq!(read_blob, Some(test_blob1.clone()));
644
645 let blob_ids = vec![blob_id1, blob_id2, blob_id3];
647 let read_blobs = storage.read_blobs(&blob_ids).await?;
648 assert_eq!(read_blobs.len(), 3);
649
650 assert_eq!(read_blobs[0], Some(test_blob1.clone()));
652 assert_eq!(read_blobs[1], Some(test_blob2));
653 assert_eq!(read_blobs[2], Some(test_blob3));
654
655 let missing_blob_id = BlobId::new(CryptoHash::test_hash("missing"), BlobType::Data);
657 let missing_blobs = storage.missing_blobs(&[blob_id1, missing_blob_id]).await?;
658 assert_eq!(missing_blobs, vec![missing_blob_id]);
659
660 let write_results = storage
662 .maybe_write_blobs(std::slice::from_ref(&test_blob1))
663 .await?;
664 assert_eq!(write_results, vec![false]);
665
666 let blob_state1 = BlobState {
668 last_used_by: None,
669 chain_id: ChainId(CryptoHash::test_hash("chain1")),
670 block_height: BlockHeight(0),
671 epoch: Some(Epoch::ZERO),
672 };
673 let blob_state2 = BlobState {
674 last_used_by: Some(CryptoHash::test_hash("cert")),
675 chain_id: ChainId(CryptoHash::test_hash("chain2")),
676 block_height: BlockHeight(1),
677 epoch: Some(Epoch::from(1)),
678 };
679
680 assert!(!storage.contains_blob_state(blob_id1).await?);
682 assert!(!storage.contains_blob_state(blob_id2).await?);
683
684 storage
686 .maybe_write_blob_states(&[blob_id1], blob_state1.clone())
687 .await?;
688 storage
689 .maybe_write_blob_states(&[blob_id2], blob_state2.clone())
690 .await?;
691
692 assert!(storage.contains_blob_state(blob_id1).await?);
694 assert!(storage.contains_blob_state(blob_id2).await?);
695
696 let read_blob_state = storage.read_blob_state(blob_id1).await?;
698 assert_eq!(read_blob_state, Some(blob_state1.clone()));
699
700 let read_blob_states = storage.read_blob_states(&[blob_id1, blob_id2]).await?;
702 assert_eq!(read_blob_states.len(), 2);
703
704 assert_eq!(read_blob_states[0], Some(blob_state1));
706 assert_eq!(read_blob_states[1], Some(blob_state2));
707
708 let write_results = storage
710 .maybe_write_blobs(std::slice::from_ref(&test_blob1))
711 .await?;
712 assert_eq!(write_results, vec![true]);
713
714 Ok(())
715 }
716
717 async fn test_storage_certificate<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
718 where
719 S::Context: Send + Sync,
720 {
721 let cert_hash = CryptoHash::test_hash("certificate");
722
723 assert!(!storage.contains_certificate(cert_hash).await?);
725
726 assert!(storage.read_certificate(cert_hash).await?.is_none());
728
729 let cert_hashes = vec![cert_hash, CryptoHash::test_hash("cert2")];
731 let certs_result = storage.read_certificates(&cert_hashes).await?;
732 assert_eq!(certs_result.len(), 2);
733 assert!(certs_result[0].is_none());
734 assert!(certs_result[1].is_none());
735
736 let raw_certs_result = storage.read_certificates_raw(&cert_hashes).await?;
738 assert!(raw_certs_result.iter().all(|cert| cert.is_none())); let block_hash = CryptoHash::test_hash("block");
742 let block_result = storage.read_confirmed_block(block_hash).await?;
743 assert!(block_result.is_none());
744
745 let test_blob1 = Blob::new_data(vec![1, 2, 3]);
748 let test_blob2 = Blob::new_data(vec![4, 5, 6]);
749 let blobs = vec![test_blob1, test_blob2];
750
751 let chain_id = ChainId(CryptoHash::test_hash("test_chain_cert"));
753
754 let proposed_block = ProposedBlock {
756 epoch: Epoch::ZERO,
757 chain_id,
758 transactions: vec![],
759 previous_block_hash: None,
760 height: BlockHeight::ZERO,
761 authenticated_owner: None,
762 timestamp: Timestamp::default(),
763 };
764
765 let outcome = BlockExecutionOutcome {
767 messages: vec![],
768 state_hash: CryptoHash::default(),
769 oracle_responses: vec![],
770 events: vec![],
771 blobs: vec![],
772 operation_results: vec![],
773 previous_event_blocks: BTreeMap::new(),
774 previous_message_blocks: BTreeMap::new(),
775 };
776
777 let block = Block::new(proposed_block, outcome);
778 let confirmed_block = ConfirmedBlock::new(block);
779 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
780
781 storage
783 .write_blobs_and_certificate(&blobs, &certificate)
784 .await?;
785
786 let cert_hash = certificate.hash();
788 assert!(storage.contains_certificate(cert_hash).await?);
789
790 let read_certificate = storage.read_certificate(cert_hash).await?;
792 assert!(read_certificate.is_some());
793 assert_eq!(read_certificate.unwrap().hash(), cert_hash);
794
795 for blob in &blobs {
797 assert!(storage.contains_blob(blob.id()).await?);
798 }
799
800 Ok(())
801 }
802
803 async fn test_storage_event<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
804 where
805 S::Context: Send + Sync,
806 {
807 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
808 let stream_id = StreamId::system("test_stream");
809
810 let event_id1 = EventId {
812 chain_id,
813 stream_id: stream_id.clone(),
814 index: 0,
815 };
816 let event_id2 = EventId {
817 chain_id,
818 stream_id: stream_id.clone(),
819 index: 1,
820 };
821 let event_id3 = EventId {
822 chain_id,
823 stream_id: stream_id.clone(),
824 index: 2,
825 };
826
827 let event_data1 = vec![1, 2, 3];
828 let event_data2 = vec![4, 5, 6];
829 let event_data3 = vec![7, 8, 9];
830
831 assert!(!storage.contains_event(event_id1.clone()).await?);
833 assert!(!storage.contains_event(event_id2.clone()).await?);
834
835 storage
837 .write_events([
838 (event_id1.clone(), event_data1.clone()),
839 (event_id2.clone(), event_data2.clone()),
840 (event_id3.clone(), event_data3.clone()),
841 ])
842 .await?;
843
844 assert!(storage.contains_event(event_id1.clone()).await?);
846 assert!(storage.contains_event(event_id2.clone()).await?);
847 assert!(storage.contains_event(event_id3.clone()).await?);
848
849 let read_event1 = storage.read_event(event_id1).await?;
851 assert_eq!(read_event1, Some(event_data1));
852
853 let read_event2 = storage.read_event(event_id2).await?;
854 assert_eq!(read_event2, Some(event_data2));
855
856 let events_from_index = storage
858 .read_events_from_index(&chain_id, &stream_id, 1)
859 .await?;
860 assert!(events_from_index.len() >= 2); Ok(())
862 }
863
864 async fn test_storage_network_description<S: Storage + Sync>(
865 storage: &S,
866 ) -> Result<(), ViewError>
867 where
868 S::Context: Send + Sync,
869 {
870 let admin_chain_id = ChainId(CryptoHash::test_hash("test_chain_second"));
871
872 let network_desc = NetworkDescription {
873 name: "test_network".to_string(),
874 genesis_config_hash: CryptoHash::test_hash("genesis_config"),
875 genesis_timestamp: Timestamp::from(0),
876 genesis_committee_blob_hash: CryptoHash::test_hash("committee"),
877 admin_chain_id,
878 };
879
880 assert!(storage.read_network_description().await?.is_none());
882
883 storage.write_network_description(&network_desc).await?;
885
886 let read_desc = storage.read_network_description().await?;
888 assert_eq!(read_desc, Some(network_desc));
889
890 Ok(())
891 }
892
893 #[test_case(DbStorage::<MemoryDatabase, _>::make_test_storage(None).await; "memory")]
895 #[cfg_attr(feature = "dynamodb", test_case(DbStorage::<DynamoDbDatabase, _>::make_test_storage(None).await; "dynamo_db"))]
896 #[cfg_attr(feature = "scylladb", test_case(DbStorage::<ScyllaDbDatabase, _>::make_test_storage(None).await; "scylla_db"))]
897 #[test_log::test(tokio::test)]
898 async fn test_storage_features<S: Storage + Sync>(storage: S) -> Result<(), ViewError>
899 where
900 S::Context: Send + Sync,
901 {
902 test_storage_chain_exporter(&storage).await?;
903 test_storage_blob(&storage).await?;
904 test_storage_certificate(&storage).await?;
905 test_storage_event(&storage).await?;
906 test_storage_network_description(&storage).await?;
907 Ok(())
908 }
909}