1#![deny(missing_docs)]
7
8mod db_storage;
9
10use std::sync::Arc as StdArc;
11
12use async_trait::async_trait;
13use itertools::Itertools;
14use linera_base::{
15 crypto::CryptoHash,
16 data_types::{
17 ApplicationDescription, Blob, BlockHeight, ChainDescription, CompressedBytecode, Epoch,
18 NetworkDescription, Timestamp,
19 },
20 identifiers::{ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent, StreamId},
21 vm::VmRuntime,
22};
23pub use linera_cache::{Arc, DEFAULT_CLEANUP_INTERVAL_SECS};
24use linera_chain::{
25 types::{ConfirmedBlock, ConfirmedBlockCertificate},
26 ChainError, ChainStateView,
27};
28use linera_execution::{
29 committee::Committee, BlobState, ExecutionError, ExecutionRuntimeConfig,
30 ExecutionRuntimeContext, SharedCommittees, TransactionTracker, UserContractCode,
31 UserServiceCode, WasmRuntime,
32};
33#[cfg(with_revm)]
34use linera_execution::{
35 evm::revm::{EvmContractModule, EvmServiceModule},
36 EvmRuntime,
37};
38#[cfg(with_wasm_runtime)]
39use linera_execution::{WasmContractModule, WasmServiceModule};
40use linera_views::{context::Context, views::RootView, ViewError};
41
42#[cfg(with_metrics)]
43pub use crate::db_storage::metrics;
44pub use crate::db_storage::{
45 ChainStatesFirstAssignment, DbStorage, RootKey, StorageCacheConfig, StorageCaches, WallClock,
46};
47#[cfg(with_testing)]
48pub use crate::db_storage::{TestClock, DEFAULT_STORAGE_CACHE_CONFIG};
49
50pub const DEFAULT_NAMESPACE: &str = "default";
52
53#[cfg_attr(not(web), async_trait)]
55#[cfg_attr(web, async_trait(?Send))]
56pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
57 type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
59
60 type Clock: Clock + Clone + Send + Sync;
62
63 type BlockExporterContext: Context<Extra = u32> + Clone;
65
66 fn clock(&self) -> &Self::Clock;
68
69 fn thread_pool(&self) -> &StdArc<linera_execution::ThreadPool>;
71
72 async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
80
81 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
83
84 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
86
87 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
89
90 async fn read_confirmed_block(
92 &self,
93 hash: CryptoHash,
94 ) -> Result<Option<Arc<ConfirmedBlock>>, ViewError>;
95
96 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
98 &self,
99 hashes: I,
100 ) -> Result<Vec<Option<Arc<ConfirmedBlock>>>, ViewError>;
101
102 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError>;
104
105 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Arc<Blob>>>, ViewError>;
107
108 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
110
111 async fn read_blob_states(
113 &self,
114 blob_ids: &[BlobId],
115 ) -> Result<Vec<Option<BlobState>>, ViewError>;
116
117 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
119
120 async fn write_blobs_and_certificate(
122 &self,
123 blobs: &[Blob],
124 certificate: &ConfirmedBlockCertificate,
125 ) -> Result<(), ViewError>;
126
127 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
130
131 async fn maybe_write_blob_states(
133 &self,
134 blob_ids: &[BlobId],
135 blob_state: BlobState,
136 ) -> Result<(), ViewError>;
137
138 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
140
141 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
143
144 fn cache_certificate(
151 &self,
152 certificate: ConfirmedBlockCertificate,
153 ) -> Arc<ConfirmedBlockCertificate>;
154
155 fn cache_blob(&self, blob: Blob) -> Arc<Blob>;
162
163 fn cache_confirmed_block(&self, block: ConfirmedBlock) -> Arc<ConfirmedBlock>;
170
171 async fn read_certificate(
173 &self,
174 hash: CryptoHash,
175 ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError>;
176
177 async fn read_certificates(
179 &self,
180 hashes: &[CryptoHash],
181 ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError>;
182
183 async fn read_certificates_raw(
189 &self,
190 hashes: &[CryptoHash],
191 ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError>;
192
193 async fn read_certificates_by_heights(
197 &self,
198 chain_id: ChainId,
199 heights: &[BlockHeight],
200 ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError>;
201
202 async fn read_certificates_by_heights_raw(
207 &self,
208 chain_id: ChainId,
209 heights: &[BlockHeight],
210 ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError>;
211
212 async fn read_certificate_hashes_by_heights(
216 &self,
217 chain_id: ChainId,
218 heights: &[BlockHeight],
219 ) -> Result<Vec<Option<CryptoHash>>, ViewError>;
220
221 async fn read_event_block_heights(
224 &self,
225 event_ids: &[EventId],
226 ) -> Result<Vec<Option<BlockHeight>>, ViewError>;
227
228 async fn read_event(&self, id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError>;
230
231 async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
233
234 async fn read_events_from_index(
236 &self,
237 chain_id: &ChainId,
238 stream_id: &StreamId,
239 start_index: u32,
240 ) -> Result<Vec<IndexAndEvent>, ViewError>;
241
242 async fn write_events(
244 &self,
245 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
246 ) -> Result<(), ViewError>;
247
248 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
250
251 async fn write_network_description(
253 &self,
254 information: &NetworkDescription,
255 ) -> Result<(), ViewError>;
256
257 async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
265 where
266 ChainRuntimeContext<Self>: ExecutionRuntimeContext,
267 {
268 let id = description.id();
269 let description_blob = Blob::new_chain_description(&description);
273 let description_blob_id = description_blob.id();
274 self.write_blob(&description_blob).await?;
275 self.maybe_write_blob_states(&[description_blob_id], BlobState::GENESIS)
276 .await?;
277 let mut chain = self.load_chain(id).await?;
278 assert!(
279 !chain.is_active().await?,
280 "Attempting to create a chain twice"
281 );
282 let current_time = self.clock().current_time();
283 chain.initialize_if_needed(current_time).await?;
284 chain.save().await?;
285 Ok(())
286 }
287
288 fn wasm_runtime(&self) -> Option<WasmRuntime>;
290
291 async fn load_contract(
294 &self,
295 application_description: &ApplicationDescription,
296 txn_tracker: &TransactionTracker,
297 ) -> Result<UserContractCode, ExecutionError> {
298 let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
299 let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
300 Some(content) => content.clone(),
301 None => self
302 .read_blob(contract_bytecode_blob_id)
303 .await?
304 .ok_or(ExecutionError::BlobsNotFound(vec![
305 contract_bytecode_blob_id,
306 ]))?
307 .content()
308 .clone(),
309 };
310 let compressed_contract_bytecode = CompressedBytecode {
311 compressed_bytes: content.into_arc_bytes(),
312 };
313 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
314 let contract_bytecode = self
315 .thread_pool()
316 .run_send((), move |()| async move {
317 compressed_contract_bytecode.decompress()
318 })
319 .await
320 .await??;
321 match application_description.module_id.vm_runtime {
322 VmRuntime::Wasm => {
323 cfg_if::cfg_if! {
324 if #[cfg(with_wasm_runtime)] {
325 let Some(wasm_runtime) = self.wasm_runtime() else {
326 panic!("A Wasm runtime is required to load user applications.");
327 };
328 Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
329 .await?
330 .into())
331 } else {
332 panic!(
333 "A Wasm runtime is required to load user applications. \
334 Please enable the `wasmer` or the `wasmtime` feature flags \
335 when compiling `linera-storage`."
336 );
337 }
338 }
339 }
340 VmRuntime::Evm => {
341 cfg_if::cfg_if! {
342 if #[cfg(with_revm)] {
343 let evm_runtime = EvmRuntime::Revm;
344 Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
345 .into())
346 } else {
347 panic!(
348 "An Evm runtime is required to load user applications. \
349 Please enable the `revm` feature flag \
350 when compiling `linera-storage`."
351 );
352 }
353 }
354 }
355 }
356 }
357
358 async fn load_service(
361 &self,
362 application_description: &ApplicationDescription,
363 txn_tracker: &TransactionTracker,
364 ) -> Result<UserServiceCode, ExecutionError> {
365 let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
366 let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
367 Some(content) => content.clone(),
368 None => self
369 .read_blob(service_bytecode_blob_id)
370 .await?
371 .ok_or(ExecutionError::BlobsNotFound(vec![
372 service_bytecode_blob_id,
373 ]))?
374 .content()
375 .clone(),
376 };
377 let compressed_service_bytecode = CompressedBytecode {
378 compressed_bytes: content.into_arc_bytes(),
379 };
380 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
381 let service_bytecode = self
382 .thread_pool()
383 .run_send((), move |()| async move {
384 compressed_service_bytecode.decompress()
385 })
386 .await
387 .await??;
388 match application_description.module_id.vm_runtime {
389 VmRuntime::Wasm => {
390 cfg_if::cfg_if! {
391 if #[cfg(with_wasm_runtime)] {
392 let Some(wasm_runtime) = self.wasm_runtime() else {
393 panic!("A Wasm runtime is required to load user applications.");
394 };
395 Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
396 .await?
397 .into())
398 } else {
399 panic!(
400 "A Wasm runtime is required to load user applications. \
401 Please enable the `wasmer` or the `wasmtime` feature flags \
402 when compiling `linera-storage`."
403 );
404 }
405 }
406 }
407 VmRuntime::Evm => {
408 cfg_if::cfg_if! {
409 if #[cfg(with_revm)] {
410 let evm_runtime = EvmRuntime::Revm;
411 Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
412 .into())
413 } else {
414 panic!(
415 "An Evm runtime is required to load user applications. \
416 Please enable the `revm` feature flag \
417 when compiling `linera-storage`."
418 );
419 }
420 }
421 }
422 }
423 }
424
425 async fn block_exporter_context(
427 &self,
428 block_exporter_id: u32,
429 ) -> Result<Self::BlockExporterContext, ViewError>;
430
431 fn shared_committees(&self) -> &SharedCommittees;
433
434 async fn get_or_load_committee_by_hash(
437 &self,
438 hash: CryptoHash,
439 ) -> Result<StdArc<Committee>, ExecutionError> {
440 if let Some(committee) = self.shared_committees().get(hash) {
441 return Ok(committee);
442 }
443 let blob_id = BlobId::new(hash, BlobType::Committee);
444 let blob = self
445 .read_blob(blob_id)
446 .await?
447 .ok_or(ExecutionError::BlobsNotFound(vec![blob_id]))?;
448 let committee = bcs::from_bytes(blob.bytes())?;
449 Ok(self
450 .shared_committees()
451 .insert(hash, StdArc::new(committee)))
452 }
453
454 async fn is_epoch_revoked(&self, epoch: Epoch) -> Result<bool, ExecutionError> {
457 let net_desc = self
458 .read_network_description()
459 .await?
460 .ok_or(ExecutionError::NoNetworkDescriptionFound)?;
461 let event_id = EventId {
462 chain_id: net_desc.admin_chain_id,
463 stream_id: StreamId::system(linera_execution::system::REMOVED_EPOCH_STREAM_NAME),
464 index: epoch.0,
465 };
466 Ok(self.contains_event(event_id).await?)
467 }
468
469 async fn committee_for_epoch(
474 &self,
475 epoch: Epoch,
476 ) -> Result<Option<StdArc<Committee>>, ExecutionError> {
477 let blob_hash = if epoch == Epoch::ZERO {
478 self.read_network_description()
479 .await?
480 .ok_or(ExecutionError::NoNetworkDescriptionFound)?
481 .genesis_committee_blob_hash
482 } else {
483 let net_desc = self
484 .read_network_description()
485 .await?
486 .ok_or(ExecutionError::NoNetworkDescriptionFound)?;
487 let event_id = EventId {
488 chain_id: net_desc.admin_chain_id,
489 stream_id: StreamId::system(linera_execution::system::EPOCH_STREAM_NAME),
490 index: epoch.0,
491 };
492 let Some(bytes) = self.read_event(event_id).await? else {
493 return Ok(None);
494 };
495 let event_data: linera_execution::system::EpochEventData = bcs::from_bytes(&bytes)?;
496 event_data.blob_hash
497 };
498 Ok(Some(self.get_or_load_committee_by_hash(blob_hash).await?))
499 }
500
501 async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError>;
503
504 async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError>;
506
507 async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError>;
509}
510
511pub enum ResultReadCertificates {
513 Certificates(Vec<ConfirmedBlockCertificate>),
515 InvalidHashes(Vec<CryptoHash>),
517}
518
519impl ResultReadCertificates {
520 pub fn new(
522 certificates: Vec<Option<Arc<ConfirmedBlockCertificate>>>,
523 hashes: Vec<CryptoHash>,
524 ) -> Self {
525 let (certificates, invalid_hashes) = certificates
526 .into_iter()
527 .zip(hashes)
528 .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
529 Some(cert) => itertools::Either::Left(Arc::unwrap_or_clone(cert)),
530 None => itertools::Either::Right(hash),
531 });
532 if invalid_hashes.is_empty() {
533 Self::Certificates(certificates)
534 } else {
535 Self::InvalidHashes(invalid_hashes)
536 }
537 }
538}
539
540#[derive(Clone)]
542pub struct ChainRuntimeContext<S> {
543 storage: S,
544 chain_id: ChainId,
545 thread_pool: StdArc<linera_execution::ThreadPool>,
546 execution_runtime_config: ExecutionRuntimeConfig,
547 user_contracts: StdArc<papaya::HashMap<ApplicationId, UserContractCode>>,
548 user_services: StdArc<papaya::HashMap<ApplicationId, UserServiceCode>>,
549}
550
551#[cfg_attr(not(web), async_trait)]
552#[cfg_attr(web, async_trait(?Send))]
553impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
554 fn chain_id(&self) -> ChainId {
555 self.chain_id
556 }
557
558 fn thread_pool(&self) -> &StdArc<linera_execution::ThreadPool> {
559 &self.thread_pool
560 }
561
562 fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
563 self.execution_runtime_config
564 }
565
566 fn user_contracts(&self) -> &StdArc<papaya::HashMap<ApplicationId, UserContractCode>> {
567 &self.user_contracts
568 }
569
570 fn user_services(&self) -> &StdArc<papaya::HashMap<ApplicationId, UserServiceCode>> {
571 &self.user_services
572 }
573
574 async fn get_user_contract(
575 &self,
576 description: &ApplicationDescription,
577 txn_tracker: &TransactionTracker,
578 ) -> Result<UserContractCode, ExecutionError> {
579 let application_id = description.into();
580 let pinned = self.user_contracts.pin_owned();
581 if let Some(contract) = pinned.get(&application_id) {
582 return Ok(contract.clone());
583 }
584 let contract = self.storage.load_contract(description, txn_tracker).await?;
585 pinned.insert(application_id, contract.clone());
586 Ok(contract)
587 }
588
589 async fn get_user_service(
590 &self,
591 description: &ApplicationDescription,
592 txn_tracker: &TransactionTracker,
593 ) -> Result<UserServiceCode, ExecutionError> {
594 let application_id = description.into();
595 let pinned = self.user_services.pin_owned();
596 if let Some(service) = pinned.get(&application_id) {
597 return Ok(service.clone());
598 }
599 let service = self.storage.load_service(description, txn_tracker).await?;
600 pinned.insert(application_id, service.clone());
601 Ok(service)
602 }
603
604 async fn get_blob(&self, blob_id: BlobId) -> Result<Option<StdArc<Blob>>, ViewError> {
605 Ok(self.storage.read_blob(blob_id).await?.map(Arc::into_std))
606 }
607
608 async fn get_event(&self, event_id: EventId) -> Result<Option<StdArc<Vec<u8>>>, ViewError> {
609 Ok(self.storage.read_event(event_id).await?.map(Arc::into_std))
610 }
611
612 async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
613 self.storage.read_network_description().await
614 }
615
616 async fn get_or_load_committee_by_hash(
617 &self,
618 hash: CryptoHash,
619 ) -> Result<StdArc<Committee>, ExecutionError> {
620 self.storage.get_or_load_committee_by_hash(hash).await
621 }
622
623 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
624 self.storage.contains_blob(blob_id).await
625 }
626
627 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
628 self.storage.contains_event(event_id).await
629 }
630
631 #[cfg(with_testing)]
632 async fn add_blobs(
633 &self,
634 blobs: impl IntoIterator<Item = Blob> + Send,
635 ) -> Result<(), ViewError> {
636 let blobs = Vec::from_iter(blobs);
637 self.storage.write_blobs(&blobs).await
638 }
639
640 #[cfg(with_testing)]
641 async fn add_events(
642 &self,
643 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
644 ) -> Result<(), ViewError> {
645 self.storage.write_events(events).await
646 }
647}
648
649#[cfg_attr(not(web), async_trait)]
651#[cfg_attr(web, async_trait(?Send))]
652pub trait Clock {
653 fn current_time(&self) -> Timestamp;
655
656 async fn sleep_until(&self, timestamp: Timestamp);
658}
659
660#[cfg(test)]
661mod tests {
662 use std::collections::BTreeMap;
663
664 use linera_base::{
665 crypto::{AccountPublicKey, CryptoHash},
666 data_types::{
667 Amount, ApplicationPermissions, Blob, BlockHeight, ChainDescription, ChainOrigin,
668 Epoch, InitialChainConfig, NetworkDescription, Round, Timestamp,
669 },
670 identifiers::{BlobId, BlobType, ChainId, EventId, StreamId},
671 ownership::ChainOwnership,
672 };
673 use linera_chain::{
674 block::{Block, ConfirmedBlock},
675 data_types::{BlockExecutionOutcome, ProposedBlock},
676 };
677 use linera_execution::{BlobOrigin, BlobState};
678 #[cfg(feature = "scylladb")]
679 use linera_views::scylla_db::ScyllaDbDatabase;
680 use linera_views::{memory::MemoryDatabase, ViewError};
681 use test_case::test_case;
682
683 use super::*;
684 use crate::db_storage::DbStorage;
685
686 async fn test_storage_chain_exporter<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
688 where
689 S::Context: Send + Sync,
690 {
691 let _current_time = storage.clock().current_time();
693 let test_chain_id = ChainId(CryptoHash::test_hash("test_chain"));
694
695 let _chain_view = storage.load_chain(test_chain_id).await?;
697
698 let _block_exporter_context = storage.block_exporter_context(0).await?;
700 Ok(())
701 }
702
703 async fn test_storage_blob<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
704 where
705 S::Context: Send + Sync,
706 {
707 let chain_description = ChainDescription::new(
709 ChainOrigin::Root(0),
710 InitialChainConfig {
711 ownership: ChainOwnership::single(AccountPublicKey::test_key(0).into()),
712 epoch: Epoch::ZERO,
713 balance: Amount::ZERO,
714 application_permissions: ApplicationPermissions::default(),
715 },
716 Timestamp::from(0),
717 );
718
719 let test_blob1 = Blob::new_chain_description(&chain_description);
720 let test_blob2 = Blob::new_data(vec![10, 20, 30]);
721 let test_blob3 = Blob::new_data(vec![40, 50, 60]);
722
723 let blob_id1 = test_blob1.id();
725 let blob_id2 = test_blob2.id();
726 let blob_id3 = test_blob3.id();
727
728 assert!(!storage.contains_blob(blob_id1).await?);
730 assert!(!storage.contains_blob(blob_id2).await?);
731 assert!(!storage.contains_blob(blob_id3).await?);
732
733 storage.write_blob(&test_blob1).await?;
735 assert!(storage.contains_blob(blob_id1).await?);
736
737 storage
739 .write_blobs(&[test_blob2.clone(), test_blob3.clone()])
740 .await?;
741 assert!(storage.contains_blob(blob_id2).await?);
742 assert!(storage.contains_blob(blob_id3).await?);
743
744 let read_blob = storage.read_blob(blob_id1).await?;
746 assert_eq!(read_blob.as_deref(), Some(&test_blob1));
747
748 let blob_ids = vec![blob_id1, blob_id2, blob_id3];
750 let read_blobs = storage.read_blobs(&blob_ids).await?;
751 assert_eq!(read_blobs.len(), 3);
752
753 assert_eq!(read_blobs[0].as_deref(), Some(&test_blob1));
755 assert_eq!(read_blobs[1].as_deref(), Some(&test_blob2));
756 assert_eq!(read_blobs[2].as_deref(), Some(&test_blob3));
757
758 let missing_blob_id = BlobId::new(CryptoHash::test_hash("missing"), BlobType::Data);
760 let missing_blobs = storage.missing_blobs(&[blob_id1, missing_blob_id]).await?;
761 assert_eq!(missing_blobs, vec![missing_blob_id]);
762
763 let write_results = storage
765 .maybe_write_blobs(std::slice::from_ref(&test_blob1))
766 .await?;
767 assert_eq!(write_results, vec![false]);
768
769 let blob_state1 = BlobState {
771 origin: BlobOrigin::Published {
772 chain_id: ChainId(CryptoHash::test_hash("chain1")),
773 block_height: BlockHeight(0),
774 },
775 last_used_by: None,
776 epoch: Some(Epoch::ZERO),
777 };
778 let blob_state2 = BlobState {
779 origin: BlobOrigin::Published {
780 chain_id: ChainId(CryptoHash::test_hash("chain2")),
781 block_height: BlockHeight(1),
782 },
783 last_used_by: Some(CryptoHash::test_hash("cert")),
784 epoch: Some(Epoch::from(1)),
785 };
786
787 assert!(!storage.contains_blob_state(blob_id1).await?);
789 assert!(!storage.contains_blob_state(blob_id2).await?);
790
791 storage
793 .maybe_write_blob_states(&[blob_id1], blob_state1.clone())
794 .await?;
795 storage
796 .maybe_write_blob_states(&[blob_id2], blob_state2.clone())
797 .await?;
798
799 assert!(storage.contains_blob_state(blob_id1).await?);
801 assert!(storage.contains_blob_state(blob_id2).await?);
802
803 let read_blob_state = storage.read_blob_state(blob_id1).await?;
805 assert_eq!(read_blob_state, Some(blob_state1.clone()));
806
807 let read_blob_states = storage.read_blob_states(&[blob_id1, blob_id2]).await?;
809 assert_eq!(read_blob_states.len(), 2);
810
811 assert_eq!(read_blob_states[0], Some(blob_state1));
813 assert_eq!(read_blob_states[1], Some(blob_state2));
814
815 let write_results = storage
817 .maybe_write_blobs(std::slice::from_ref(&test_blob1))
818 .await?;
819 assert_eq!(write_results, vec![true]);
820
821 Ok(())
822 }
823
824 async fn test_storage_certificate<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
825 where
826 S::Context: Send + Sync,
827 {
828 let cert_hash = CryptoHash::test_hash("certificate");
829
830 assert!(!storage.contains_certificate(cert_hash).await?);
832
833 assert!(storage.read_certificate(cert_hash).await?.is_none());
835
836 let cert_hashes = vec![cert_hash, CryptoHash::test_hash("cert2")];
838 let certs_result = storage.read_certificates(&cert_hashes).await?;
839 assert_eq!(certs_result.len(), 2);
840 assert!(certs_result[0].is_none());
841 assert!(certs_result[1].is_none());
842
843 let raw_certs_result = storage.read_certificates_raw(&cert_hashes).await?;
845 assert!(raw_certs_result.iter().all(|cert| cert.is_none())); let block_hash = CryptoHash::test_hash("block");
849 let block_result = storage.read_confirmed_block(block_hash).await?;
850 assert!(block_result.is_none());
851
852 let test_blob1 = Blob::new_data(vec![1, 2, 3]);
855 let test_blob2 = Blob::new_data(vec![4, 5, 6]);
856 let blobs = vec![test_blob1, test_blob2];
857
858 let chain_id = ChainId(CryptoHash::test_hash("test_chain_cert"));
860
861 let proposed_block = ProposedBlock {
863 epoch: Epoch::ZERO,
864 chain_id,
865 transactions: vec![],
866 previous_block_hash: None,
867 height: BlockHeight::ZERO,
868 authenticated_owner: None,
869 timestamp: Timestamp::default(),
870 };
871
872 let outcome = BlockExecutionOutcome {
874 messages: vec![],
875 state_hash: CryptoHash::default(),
876 oracle_responses: vec![],
877 events: vec![],
878 blobs: vec![],
879 operation_results: vec![],
880 previous_event_blocks: BTreeMap::new(),
881 previous_message_blocks: BTreeMap::new(),
882 };
883
884 let block = Block::new(proposed_block, outcome);
885 let confirmed_block = ConfirmedBlock::new(block);
886 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
887
888 storage
890 .write_blobs_and_certificate(&blobs, &certificate)
891 .await?;
892
893 let cert_hash = certificate.hash();
895 assert!(storage.contains_certificate(cert_hash).await?);
896
897 let read_certificate = storage.read_certificate(cert_hash).await?;
899 assert!(read_certificate.is_some());
900 assert_eq!(read_certificate.unwrap().hash(), cert_hash);
901
902 for blob in &blobs {
904 assert!(storage.contains_blob(blob.id()).await?);
905 }
906
907 Ok(())
908 }
909
910 async fn test_storage_event<S: Storage + Sync>(storage: &S) -> Result<(), ViewError>
911 where
912 S::Context: Send + Sync,
913 {
914 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
915 let stream_id = StreamId::system("test_stream");
916
917 let event_id1 = EventId {
919 chain_id,
920 stream_id: stream_id.clone(),
921 index: 0,
922 };
923 let event_id2 = EventId {
924 chain_id,
925 stream_id: stream_id.clone(),
926 index: 1,
927 };
928 let event_id3 = EventId {
929 chain_id,
930 stream_id: stream_id.clone(),
931 index: 2,
932 };
933
934 let event_data1 = vec![1, 2, 3];
935 let event_data2 = vec![4, 5, 6];
936 let event_data3 = vec![7, 8, 9];
937
938 assert!(!storage.contains_event(event_id1.clone()).await?);
940 assert!(!storage.contains_event(event_id2.clone()).await?);
941
942 storage
944 .write_events([
945 (event_id1.clone(), event_data1.clone()),
946 (event_id2.clone(), event_data2.clone()),
947 (event_id3.clone(), event_data3.clone()),
948 ])
949 .await?;
950
951 assert!(storage.contains_event(event_id1.clone()).await?);
953 assert!(storage.contains_event(event_id2.clone()).await?);
954 assert!(storage.contains_event(event_id3.clone()).await?);
955
956 let read_event1 = storage.read_event(event_id1).await?;
958 assert_eq!(read_event1.as_deref(), Some(&event_data1));
959
960 let read_event2 = storage.read_event(event_id2).await?;
961 assert_eq!(read_event2.as_deref(), Some(&event_data2));
962
963 let events_from_index = storage
965 .read_events_from_index(&chain_id, &stream_id, 1)
966 .await?;
967 assert!(events_from_index.len() >= 2); Ok(())
969 }
970
971 async fn test_storage_network_description<S: Storage + Sync>(
972 storage: &S,
973 ) -> Result<(), ViewError>
974 where
975 S::Context: Send + Sync,
976 {
977 let admin_chain_id = ChainId(CryptoHash::test_hash("test_chain_second"));
978
979 let network_desc = NetworkDescription {
980 name: "test_network".to_string(),
981 genesis_config_hash: CryptoHash::test_hash("genesis_config"),
982 genesis_timestamp: Timestamp::from(0),
983 genesis_committee_blob_hash: CryptoHash::test_hash("committee"),
984 admin_chain_id,
985 };
986
987 assert!(storage.read_network_description().await?.is_none());
989
990 storage.write_network_description(&network_desc).await?;
992
993 let read_desc = storage.read_network_description().await?;
995 assert_eq!(read_desc, Some(network_desc));
996
997 Ok(())
998 }
999
1000 #[test_case(DbStorage::<MemoryDatabase, _>::make_test_storage(None).await; "memory")]
1002 #[cfg_attr(feature = "scylladb", test_case(DbStorage::<ScyllaDbDatabase, _>::make_test_storage(None).await; "scylla_db"))]
1003 #[test_log::test(tokio::test)]
1004 async fn test_storage_features<S: Storage + Sync>(storage: S) -> Result<(), ViewError>
1005 where
1006 S::Context: Send + Sync,
1007 {
1008 test_storage_chain_exporter(&storage).await?;
1009 test_storage_blob(&storage).await?;
1010 test_storage_certificate(&storage).await?;
1011 test_storage_event(&storage).await?;
1012 test_storage_network_description(&storage).await?;
1013 Ok(())
1014 }
1015}