1mod db_storage;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use itertools::Itertools;
12use linera_base::{
13 crypto::CryptoHash,
14 data_types::{
15 ApplicationDescription, Blob, ChainDescription, CompressedBytecode, NetworkDescription,
16 TimeDelta, Timestamp,
17 },
18 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
19 vm::VmRuntime,
20};
21use linera_chain::{
22 types::{ConfirmedBlock, ConfirmedBlockCertificate},
23 ChainError, ChainStateView,
24};
25#[cfg(with_revm)]
26use linera_execution::{
27 evm::revm::{EvmContractModule, EvmServiceModule},
28 EvmRuntime,
29};
30use linera_execution::{
31 BlobState, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, TransactionTracker,
32 UserContractCode, UserServiceCode, WasmRuntime,
33};
34#[cfg(with_wasm_runtime)]
35use linera_execution::{WasmContractModule, WasmServiceModule};
36use linera_views::{context::Context, views::RootView, ViewError};
37
38#[cfg(with_metrics)]
39pub use crate::db_storage::metrics;
40#[cfg(with_testing)]
41pub use crate::db_storage::TestClock;
42pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
43
44pub const DEFAULT_NAMESPACE: &str = "default";
46
47#[cfg_attr(not(web), async_trait)]
49#[cfg_attr(web, async_trait(?Send))]
50pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
51 type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
53
54 type Clock: Clock;
56
57 type BlockExporterContext: Context<Extra = u32> + Clone;
59
60 fn clock(&self) -> &Self::Clock;
62
63 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool>;
64
65 async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
73
74 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
76
77 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
79
80 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
82
83 async fn read_confirmed_block(
85 &self,
86 hash: CryptoHash,
87 ) -> Result<Option<ConfirmedBlock>, ViewError>;
88
89 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
91
92 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
94
95 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
97
98 async fn read_blob_states(
100 &self,
101 blob_ids: &[BlobId],
102 ) -> Result<Vec<Option<BlobState>>, ViewError>;
103
104 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
106
107 async fn write_blobs_and_certificate(
109 &self,
110 blobs: &[Blob],
111 certificate: &ConfirmedBlockCertificate,
112 ) -> Result<(), ViewError>;
113
114 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
117
118 async fn maybe_write_blob_states(
120 &self,
121 blob_ids: &[BlobId],
122 blob_state: BlobState,
123 ) -> Result<(), ViewError>;
124
125 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
127
128 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
130
131 async fn read_certificate(
133 &self,
134 hash: CryptoHash,
135 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
136
137 async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
139 &self,
140 hashes: I,
141 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
142
143 async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
150 &self,
151 hashes: I,
152 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError>;
153
154 async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
156
157 async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
159
160 async fn read_events_from_index(
162 &self,
163 chain_id: &ChainId,
164 stream_id: &StreamId,
165 start_index: u32,
166 ) -> Result<Vec<IndexAndEvent>, ViewError>;
167
168 async fn write_events(
170 &self,
171 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
172 ) -> Result<(), ViewError>;
173
174 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
176
177 async fn write_network_description(
179 &self,
180 information: &NetworkDescription,
181 ) -> Result<(), ViewError>;
182
183 async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
191 where
192 ChainRuntimeContext<Self>: ExecutionRuntimeContext,
193 {
194 let id = description.id();
195 self.write_blob(&Blob::new_chain_description(&description))
197 .await?;
198 let mut chain = self.load_chain(id).await?;
199 assert!(!chain.is_active(), "Attempting to create a chain twice");
200 let current_time = self.clock().current_time();
201 chain.initialize_if_needed(current_time).await?;
202 chain.save().await?;
203 Ok(())
204 }
205
206 fn wasm_runtime(&self) -> Option<WasmRuntime>;
208
209 async fn load_contract(
212 &self,
213 application_description: &ApplicationDescription,
214 txn_tracker: &TransactionTracker,
215 ) -> Result<UserContractCode, ExecutionError> {
216 let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
217 let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
218 Some(content) => content.clone(),
219 None => self
220 .read_blob(contract_bytecode_blob_id)
221 .await?
222 .ok_or(ExecutionError::BlobsNotFound(vec![
223 contract_bytecode_blob_id,
224 ]))?
225 .into_content(),
226 };
227 let compressed_contract_bytecode = CompressedBytecode {
228 compressed_bytes: content.into_arc_bytes(),
229 };
230 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
231 let contract_bytecode = self
232 .thread_pool()
233 .run_send((), move |()| async move {
234 compressed_contract_bytecode.decompress()
235 })
236 .await
237 .await??;
238 match application_description.module_id.vm_runtime {
239 VmRuntime::Wasm => {
240 cfg_if::cfg_if! {
241 if #[cfg(with_wasm_runtime)] {
242 let Some(wasm_runtime) = self.wasm_runtime() else {
243 panic!("A Wasm runtime is required to load user applications.");
244 };
245 Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
246 .await?
247 .into())
248 } else {
249 panic!(
250 "A Wasm runtime is required to load user applications. \
251 Please enable the `wasmer` or the `wasmtime` feature flags \
252 when compiling `linera-storage`."
253 );
254 }
255 }
256 }
257 VmRuntime::Evm => {
258 cfg_if::cfg_if! {
259 if #[cfg(with_revm)] {
260 let evm_runtime = EvmRuntime::Revm;
261 Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
262 .into())
263 } else {
264 panic!(
265 "An Evm runtime is required to load user applications. \
266 Please enable the `revm` feature flag \
267 when compiling `linera-storage`."
268 );
269 }
270 }
271 }
272 }
273 }
274
275 async fn load_service(
278 &self,
279 application_description: &ApplicationDescription,
280 txn_tracker: &TransactionTracker,
281 ) -> Result<UserServiceCode, ExecutionError> {
282 let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
283 let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
284 Some(content) => content.clone(),
285 None => self
286 .read_blob(service_bytecode_blob_id)
287 .await?
288 .ok_or(ExecutionError::BlobsNotFound(vec![
289 service_bytecode_blob_id,
290 ]))?
291 .into_content(),
292 };
293 let compressed_service_bytecode = CompressedBytecode {
294 compressed_bytes: content.into_arc_bytes(),
295 };
296 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
297 let service_bytecode = self
298 .thread_pool()
299 .run_send((), move |()| async move {
300 compressed_service_bytecode.decompress()
301 })
302 .await
303 .await??;
304 match application_description.module_id.vm_runtime {
305 VmRuntime::Wasm => {
306 cfg_if::cfg_if! {
307 if #[cfg(with_wasm_runtime)] {
308 let Some(wasm_runtime) = self.wasm_runtime() else {
309 panic!("A Wasm runtime is required to load user applications.");
310 };
311 Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
312 .await?
313 .into())
314 } else {
315 panic!(
316 "A Wasm runtime is required to load user applications. \
317 Please enable the `wasmer` or the `wasmtime` feature flags \
318 when compiling `linera-storage`."
319 );
320 }
321 }
322 }
323 VmRuntime::Evm => {
324 cfg_if::cfg_if! {
325 if #[cfg(with_revm)] {
326 let evm_runtime = EvmRuntime::Revm;
327 Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
328 .into())
329 } else {
330 panic!(
331 "An Evm runtime is required to load user applications. \
332 Please enable the `revm` feature flag \
333 when compiling `linera-storage`."
334 );
335 }
336 }
337 }
338 }
339 }
340
341 async fn block_exporter_context(
342 &self,
343 block_exporter_id: u32,
344 ) -> Result<Self::BlockExporterContext, ViewError>;
345
346 async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError>;
348
349 async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError>;
351
352 async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError>;
354}
355
356pub enum ResultReadCertificates {
358 Certificates(Vec<ConfirmedBlockCertificate>),
359 InvalidHashes(Vec<CryptoHash>),
360}
361
362impl ResultReadCertificates {
363 pub fn new(
365 certificates: Vec<Option<ConfirmedBlockCertificate>>,
366 hashes: Vec<CryptoHash>,
367 ) -> Self {
368 let (certificates, invalid_hashes) = certificates
369 .into_iter()
370 .zip(hashes)
371 .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
372 Some(cert) => itertools::Either::Left(cert),
373 None => itertools::Either::Right(hash),
374 });
375 if invalid_hashes.is_empty() {
376 Self::Certificates(certificates)
377 } else {
378 Self::InvalidHashes(invalid_hashes)
379 }
380 }
381}
382
383#[derive(Clone)]
385pub struct ChainRuntimeContext<S> {
386 storage: S,
387 chain_id: ChainId,
388 thread_pool: Arc<linera_execution::ThreadPool>,
389 execution_runtime_config: ExecutionRuntimeConfig,
390 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
391 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
392}
393
394#[cfg_attr(not(web), async_trait)]
395#[cfg_attr(web, async_trait(?Send))]
396impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
397 fn chain_id(&self) -> ChainId {
398 self.chain_id
399 }
400
401 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
402 &self.thread_pool
403 }
404
405 fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
406 self.execution_runtime_config
407 }
408
409 fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
410 &self.user_contracts
411 }
412
413 fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
414 &self.user_services
415 }
416
417 async fn get_user_contract(
418 &self,
419 description: &ApplicationDescription,
420 txn_tracker: &TransactionTracker,
421 ) -> Result<UserContractCode, ExecutionError> {
422 let application_id = description.into();
423 let pinned = self.user_contracts.pin_owned();
424 if let Some(contract) = pinned.get(&application_id) {
425 return Ok(contract.clone());
426 }
427 let contract = self.storage.load_contract(description, txn_tracker).await?;
428 pinned.insert(application_id, contract.clone());
429 Ok(contract)
430 }
431
432 async fn get_user_service(
433 &self,
434 description: &ApplicationDescription,
435 txn_tracker: &TransactionTracker,
436 ) -> Result<UserServiceCode, ExecutionError> {
437 let application_id = description.into();
438 let pinned = self.user_services.pin_owned();
439 if let Some(service) = pinned.get(&application_id) {
440 return Ok(service.clone());
441 }
442 let service = self.storage.load_service(description, txn_tracker).await?;
443 pinned.insert(application_id, service.clone());
444 Ok(service)
445 }
446
447 async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
448 self.storage.read_blob(blob_id).await
449 }
450
451 async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
452 self.storage.read_event(event_id).await
453 }
454
455 async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
456 self.storage.read_network_description().await
457 }
458
459 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
460 self.storage.contains_blob(blob_id).await
461 }
462
463 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
464 self.storage.contains_event(event_id).await
465 }
466
467 #[cfg(with_testing)]
468 async fn add_blobs(
469 &self,
470 blobs: impl IntoIterator<Item = Blob> + Send,
471 ) -> Result<(), ViewError> {
472 let blobs = Vec::from_iter(blobs);
473 self.storage.write_blobs(&blobs).await
474 }
475
476 #[cfg(with_testing)]
477 async fn add_events(
478 &self,
479 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
480 ) -> Result<(), ViewError> {
481 self.storage.write_events(events).await
482 }
483}
484
485#[cfg_attr(not(web), async_trait)]
487#[cfg_attr(web, async_trait(?Send))]
488pub trait Clock {
489 fn current_time(&self) -> Timestamp;
490
491 async fn sleep(&self, delta: TimeDelta);
492
493 async fn sleep_until(&self, timestamp: Timestamp);
494}
495
496#[cfg(test)]
497mod tests {
498 use std::collections::BTreeMap;
499
500 use linera_base::{
501 crypto::{AccountPublicKey, CryptoHash},
502 data_types::{
503 Amount, ApplicationPermissions, Blob, BlockHeight, ChainDescription, ChainOrigin,
504 Epoch, InitialChainConfig, NetworkDescription, Round, Timestamp,
505 },
506 identifiers::{BlobId, BlobType, ChainId, EventId, StreamId},
507 ownership::ChainOwnership,
508 };
509 use linera_chain::{
510 block::{Block, ConfirmedBlock},
511 data_types::{BlockExecutionOutcome, ProposedBlock},
512 };
513 use linera_execution::BlobState;
514 #[cfg(feature = "dynamodb")]
515 use linera_views::dynamo_db::DynamoDbDatabase;
516 #[cfg(feature = "scylladb")]
517 use linera_views::scylla_db::ScyllaDbDatabase;
518 use linera_views::{memory::MemoryDatabase, ViewError};
519 use test_case::test_case;
520
521 use super::*;
522 use crate::db_storage::DbStorage;
523
524 async fn test_storage_chain_exporter<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
526 where
527 S::Context: Send + Sync,
528 {
529 let _current_time = storage.clock().current_time();
531 let test_chain_id = ChainId(CryptoHash::test_hash("test_chain"));
532
533 let _chain_view = storage.load_chain(test_chain_id).await?;
535
536 let _block_exporter_context = storage.block_exporter_context(0).await?;
538 Ok(())
539 }
540
541 async fn test_storage_blob<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
542 where
543 S::Context: Send + Sync,
544 {
545 let chain_description = ChainDescription::new(
547 ChainOrigin::Root(0),
548 InitialChainConfig {
549 ownership: ChainOwnership::single(AccountPublicKey::test_key(0).into()),
550 epoch: Epoch::ZERO,
551 min_active_epoch: Epoch::ZERO,
552 max_active_epoch: Epoch::ZERO,
553 balance: Amount::ZERO,
554 application_permissions: ApplicationPermissions::default(),
555 },
556 Timestamp::from(0),
557 );
558
559 let test_blob1 = Blob::new_chain_description(&chain_description);
560 let test_blob2 = Blob::new_data(vec![10, 20, 30]);
561 let test_blob3 = Blob::new_data(vec![40, 50, 60]);
562
563 let blob_id1 = test_blob1.id();
565 let blob_id2 = test_blob2.id();
566 let blob_id3 = test_blob3.id();
567
568 assert!(!storage.contains_blob(blob_id1).await?);
570 assert!(!storage.contains_blob(blob_id2).await?);
571 assert!(!storage.contains_blob(blob_id3).await?);
572
573 storage.write_blob(&test_blob1).await?;
575 assert!(storage.contains_blob(blob_id1).await?);
576
577 storage
579 .write_blobs(&[test_blob2.clone(), test_blob3.clone()])
580 .await?;
581 assert!(storage.contains_blob(blob_id2).await?);
582 assert!(storage.contains_blob(blob_id3).await?);
583
584 let read_blob = storage.read_blob(blob_id1).await?;
586 assert_eq!(read_blob, Some(test_blob1.clone()));
587
588 let blob_ids = vec![blob_id1, blob_id2, blob_id3];
590 let read_blobs = storage.read_blobs(&blob_ids).await?;
591 assert_eq!(read_blobs.len(), 3);
592
593 assert_eq!(read_blobs[0], Some(test_blob1.clone()));
595 assert_eq!(read_blobs[1], Some(test_blob2));
596 assert_eq!(read_blobs[2], Some(test_blob3));
597
598 let missing_blob_id = BlobId::new(CryptoHash::test_hash("missing"), BlobType::Data);
600 let missing_blobs = storage.missing_blobs(&[blob_id1, missing_blob_id]).await?;
601 assert_eq!(missing_blobs, vec![missing_blob_id]);
602
603 let write_results = storage.maybe_write_blobs(&[test_blob1.clone()]).await?;
605 assert_eq!(write_results, vec![false]);
606
607 let blob_state1 = BlobState {
609 last_used_by: None,
610 chain_id: ChainId(CryptoHash::test_hash("chain1")),
611 block_height: BlockHeight(0),
612 epoch: Some(Epoch::ZERO),
613 };
614 let blob_state2 = BlobState {
615 last_used_by: Some(CryptoHash::test_hash("cert")),
616 chain_id: ChainId(CryptoHash::test_hash("chain2")),
617 block_height: BlockHeight(1),
618 epoch: Some(Epoch::from(1)),
619 };
620
621 assert!(!storage.contains_blob_state(blob_id1).await?);
623 assert!(!storage.contains_blob_state(blob_id2).await?);
624
625 storage
627 .maybe_write_blob_states(&[blob_id1], blob_state1.clone())
628 .await?;
629 storage
630 .maybe_write_blob_states(&[blob_id2], blob_state2.clone())
631 .await?;
632
633 assert!(storage.contains_blob_state(blob_id1).await?);
635 assert!(storage.contains_blob_state(blob_id2).await?);
636
637 let read_blob_state = storage.read_blob_state(blob_id1).await?;
639 assert_eq!(read_blob_state, Some(blob_state1.clone()));
640
641 let read_blob_states = storage.read_blob_states(&[blob_id1, blob_id2]).await?;
643 assert_eq!(read_blob_states.len(), 2);
644
645 assert_eq!(read_blob_states[0], Some(blob_state1));
647 assert_eq!(read_blob_states[1], Some(blob_state2));
648
649 let write_results = storage.maybe_write_blobs(&[test_blob1.clone()]).await?;
651 assert_eq!(write_results, vec![true]);
652
653 Ok(())
654 }
655
656 async fn test_storage_certificate<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
657 where
658 S::Context: Send + Sync,
659 {
660 let cert_hash = CryptoHash::test_hash("certificate");
661
662 assert!(!storage.contains_certificate(cert_hash).await?);
664
665 assert!(storage.read_certificate(cert_hash).await?.is_none());
667
668 let cert_hashes = vec![cert_hash, CryptoHash::test_hash("cert2")];
670 let certs_result = storage.read_certificates(cert_hashes.clone()).await?;
671 assert_eq!(certs_result.len(), 2);
672 assert!(certs_result[0].is_none());
673 assert!(certs_result[1].is_none());
674
675 let raw_certs_result = storage.read_certificates_raw(cert_hashes).await?;
677 assert!(raw_certs_result.is_empty()); let block_hash = CryptoHash::test_hash("block");
681 let block_result = storage.read_confirmed_block(block_hash).await?;
682 assert!(block_result.is_none());
683
684 let test_blob1 = Blob::new_data(vec![1, 2, 3]);
687 let test_blob2 = Blob::new_data(vec![4, 5, 6]);
688 let blobs = vec![test_blob1, test_blob2];
689
690 let chain_id = ChainId(CryptoHash::test_hash("test_chain_cert"));
692
693 let proposed_block = ProposedBlock {
695 epoch: Epoch::ZERO,
696 chain_id,
697 transactions: vec![],
698 previous_block_hash: None,
699 height: BlockHeight::ZERO,
700 authenticated_owner: None,
701 timestamp: Timestamp::default(),
702 };
703
704 let outcome = BlockExecutionOutcome {
706 messages: vec![],
707 state_hash: CryptoHash::default(),
708 oracle_responses: vec![],
709 events: vec![],
710 blobs: vec![],
711 operation_results: vec![],
712 previous_event_blocks: BTreeMap::new(),
713 previous_message_blocks: BTreeMap::new(),
714 };
715
716 let block = Block::new(proposed_block, outcome);
717 let confirmed_block = ConfirmedBlock::new(block);
718 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
719
720 storage
722 .write_blobs_and_certificate(&blobs, &certificate)
723 .await?;
724
725 let cert_hash = certificate.hash();
727 assert!(storage.contains_certificate(cert_hash).await?);
728
729 let read_certificate = storage.read_certificate(cert_hash).await?;
731 assert!(read_certificate.is_some());
732 assert_eq!(read_certificate.unwrap().hash(), cert_hash);
733
734 for blob in &blobs {
736 assert!(storage.contains_blob(blob.id()).await?);
737 }
738
739 Ok(())
740 }
741
742 async fn test_storage_event<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
743 where
744 S::Context: Send + Sync,
745 {
746 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
747 let stream_id = StreamId::system("test_stream");
748
749 let event_id1 = EventId {
751 chain_id,
752 stream_id: stream_id.clone(),
753 index: 0,
754 };
755 let event_id2 = EventId {
756 chain_id,
757 stream_id: stream_id.clone(),
758 index: 1,
759 };
760 let event_id3 = EventId {
761 chain_id,
762 stream_id: stream_id.clone(),
763 index: 2,
764 };
765
766 let event_data1 = vec![1, 2, 3];
767 let event_data2 = vec![4, 5, 6];
768 let event_data3 = vec![7, 8, 9];
769
770 assert!(!storage.contains_event(event_id1.clone()).await?);
772 assert!(!storage.contains_event(event_id2.clone()).await?);
773
774 storage
776 .write_events([
777 (event_id1.clone(), event_data1.clone()),
778 (event_id2.clone(), event_data2.clone()),
779 (event_id3.clone(), event_data3.clone()),
780 ])
781 .await?;
782
783 assert!(storage.contains_event(event_id1.clone()).await?);
785 assert!(storage.contains_event(event_id2.clone()).await?);
786 assert!(storage.contains_event(event_id3.clone()).await?);
787
788 let read_event1 = storage.read_event(event_id1).await?;
790 assert_eq!(read_event1, Some(event_data1));
791
792 let read_event2 = storage.read_event(event_id2).await?;
793 assert_eq!(read_event2, Some(event_data2));
794
795 let events_from_index = storage
797 .read_events_from_index(&chain_id, &stream_id, 1)
798 .await?;
799 assert!(events_from_index.len() >= 2); Ok(())
801 }
802
803 async fn test_storage_network_description<S: Storage + Sync>(
804 storage: &S,
805 ) -> Result<(), ViewError>
806 where
807 S::Context: Send + Sync,
808 {
809 let admin_chain_id = ChainId(CryptoHash::test_hash("test_chain_second"));
810
811 let network_desc = NetworkDescription {
812 name: "test_network".to_string(),
813 genesis_config_hash: CryptoHash::test_hash("genesis_config"),
814 genesis_timestamp: Timestamp::from(0),
815 genesis_committee_blob_hash: CryptoHash::test_hash("committee"),
816 admin_chain_id,
817 };
818
819 assert!(storage.read_network_description().await?.is_none());
821
822 storage.write_network_description(&network_desc).await?;
824
825 let read_desc = storage.read_network_description().await?;
827 assert_eq!(read_desc, Some(network_desc));
828
829 Ok(())
830 }
831
832 #[test_case(DbStorage::<MemoryDatabase, _>::make_test_storage(None).await; "memory")]
834 #[cfg_attr(feature = "dynamodb", test_case(DbStorage::<DynamoDbDatabase, _>::make_test_storage(None).await; "dynamo_db"))]
835 #[cfg_attr(feature = "scylladb", test_case(DbStorage::<ScyllaDbDatabase, _>::make_test_storage(None).await; "scylla_db"))]
836 #[test_log::test(tokio::test)]
837 async fn test_storage_features<S: Storage + Sync>(storage: S) -> Result<(), ViewError>
838 where
839 S::Context: Send + Sync,
840 {
841 test_storage_chain_exporter(&storage).await?;
842 test_storage_blob(&storage).await?;
843 test_storage_certificate(&storage).await?;
844 test_storage_event(&storage).await?;
845 test_storage_network_description(&storage).await?;
846 Ok(())
847 }
848}