1mod db_storage;
7
8use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc};
9
10use async_trait::async_trait;
11use itertools::Itertools;
12use linera_base::{
13 crypto::CryptoHash,
14 data_types::{
15 ApplicationDescription, Blob, ChainDescription, CompressedBytecode, Epoch,
16 NetworkDescription, TimeDelta, Timestamp,
17 },
18 identifiers::{ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent, StreamId},
19 vm::VmRuntime,
20};
21use linera_chain::{
22 types::{ConfirmedBlock, ConfirmedBlockCertificate},
23 ChainError, ChainStateView,
24};
25use linera_execution::{
26 committee::Committee, system::EPOCH_STREAM_NAME, BlobState, ExecutionError,
27 ExecutionRuntimeConfig, ExecutionRuntimeContext, TransactionTracker, UserContractCode,
28 UserServiceCode, WasmRuntime,
29};
30#[cfg(with_revm)]
31use linera_execution::{
32 evm::revm::{EvmContractModule, EvmServiceModule},
33 EvmRuntime,
34};
35#[cfg(with_wasm_runtime)]
36use linera_execution::{WasmContractModule, WasmServiceModule};
37use linera_views::{context::Context, views::RootView, ViewError};
38
39#[cfg(with_metrics)]
40pub use crate::db_storage::metrics;
41#[cfg(with_testing)]
42pub use crate::db_storage::TestClock;
43pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
44
45pub const DEFAULT_NAMESPACE: &str = "table_linera";
47
48#[cfg_attr(not(web), async_trait)]
50#[cfg_attr(web, async_trait(?Send))]
51pub trait Storage: Sized {
52 type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + Send + Sync + 'static;
54
55 type Clock: Clock;
57
58 type BlockExporterContext: Context<Extra = u32> + Clone + Send + Sync + 'static;
60
61 fn clock(&self) -> &Self::Clock;
63
64 async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
72
73 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
75
76 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
78
79 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
81
82 async fn read_confirmed_block(
84 &self,
85 hash: CryptoHash,
86 ) -> Result<Option<ConfirmedBlock>, ViewError>;
87
88 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
90
91 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
93
94 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
96
97 async fn read_blob_states(
99 &self,
100 blob_ids: &[BlobId],
101 ) -> Result<Vec<Option<BlobState>>, ViewError>;
102
103 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
105
106 async fn write_blobs_and_certificate(
108 &self,
109 blobs: &[Blob],
110 certificate: &ConfirmedBlockCertificate,
111 ) -> Result<(), ViewError>;
112
113 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
116
117 async fn maybe_write_blob_states(
119 &self,
120 blob_ids: &[BlobId],
121 blob_state: BlobState,
122 ) -> Result<(), ViewError>;
123
124 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
126
127 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
129
130 async fn read_certificate(
132 &self,
133 hash: CryptoHash,
134 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
135
136 async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
138 &self,
139 hashes: I,
140 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
141
142 async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
149 &self,
150 hashes: I,
151 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError>;
152
153 async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
155
156 async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
158
159 async fn read_events_from_index(
161 &self,
162 chain_id: &ChainId,
163 stream_id: &StreamId,
164 start_index: u32,
165 ) -> Result<Vec<IndexAndEvent>, ViewError>;
166
167 async fn write_events(
169 &self,
170 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
171 ) -> Result<(), ViewError>;
172
173 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
175
176 async fn write_network_description(
178 &self,
179 information: &NetworkDescription,
180 ) -> Result<(), ViewError>;
181
182 async fn committees_for(
184 &self,
185 epoch_range: RangeInclusive<Epoch>,
186 ) -> Result<BTreeMap<Epoch, Committee>, ViewError> {
187 if epoch_range.is_empty() {
189 return Ok(BTreeMap::new());
190 }
191 let min_epoch = epoch_range.start();
192 let max_epoch = epoch_range.end();
193 let read_committee = async |committee_hash| -> Result<Committee, ViewError> {
194 let blob_id = BlobId::new(committee_hash, BlobType::Committee);
195 let committee_blob = self
196 .read_blob(blob_id)
197 .await?
198 .ok_or_else(|| ViewError::NotFound(format!("blob {}", blob_id)))?;
199 Ok(bcs::from_bytes(committee_blob.bytes())?)
200 };
201
202 let network_description = self
203 .read_network_description()
204 .await?
205 .ok_or_else(|| ViewError::NotFound("NetworkDescription not found".to_owned()))?;
206 let admin_chain_id = network_description.admin_chain_id;
207 let mut result = BTreeMap::new();
208 if *min_epoch == Epoch::ZERO {
210 let genesis_committee =
211 read_committee(network_description.genesis_committee_blob_hash).await?;
212 result.insert(Epoch::ZERO, genesis_committee);
213 }
214
215 let start_index = min_epoch.0.max(1);
216 let epoch_creation_events = self
217 .read_events_from_index(
218 &admin_chain_id,
219 &StreamId::system(EPOCH_STREAM_NAME),
220 start_index,
221 )
222 .await?;
223
224 result.extend(
225 futures::future::try_join_all(
226 epoch_creation_events
227 .into_iter()
228 .take_while(|index_and_event| index_and_event.index <= max_epoch.0)
229 .map(|index_and_event| async move {
230 let epoch = Epoch::from(index_and_event.index);
231 let maybe_blob_hash = bcs::from_bytes::<CryptoHash>(&index_and_event.event);
232 let committee = read_committee(maybe_blob_hash?).await?;
233 Result::<_, ViewError>::Ok((epoch, committee))
234 }),
235 )
236 .await?,
237 );
238
239 Ok(result)
240 }
241
242 async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
250 where
251 ChainRuntimeContext<Self>: ExecutionRuntimeContext,
252 {
253 let id = description.id();
254 self.write_blob(&Blob::new_chain_description(&description))
256 .await?;
257 let mut chain = self.load_chain(id).await?;
258 assert!(!chain.is_active(), "Attempting to create a chain twice");
259 let current_time = self.clock().current_time();
260 chain.ensure_is_active(current_time).await?;
261 chain.save().await?;
262 Ok(())
263 }
264
265 fn wasm_runtime(&self) -> Option<WasmRuntime>;
267
268 async fn load_contract(
271 &self,
272 application_description: &ApplicationDescription,
273 txn_tracker: &TransactionTracker,
274 ) -> Result<UserContractCode, ExecutionError> {
275 let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
276 let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
277 Some(content) => content.clone(),
278 None => self
279 .read_blob(contract_bytecode_blob_id)
280 .await?
281 .ok_or(ExecutionError::BlobsNotFound(vec![
282 contract_bytecode_blob_id,
283 ]))?
284 .into_content(),
285 };
286 let compressed_contract_bytecode = CompressedBytecode {
287 compressed_bytes: content.into_arc_bytes(),
288 };
289 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
290 let contract_bytecode =
291 linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
292 move |_| async move { compressed_contract_bytecode.decompress() },
293 )
294 .await
295 .join()
296 .await?;
297 match application_description.module_id.vm_runtime {
298 VmRuntime::Wasm => {
299 cfg_if::cfg_if! {
300 if #[cfg(with_wasm_runtime)] {
301 let Some(wasm_runtime) = self.wasm_runtime() else {
302 panic!("A Wasm runtime is required to load user applications.");
303 };
304 Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
305 .await?
306 .into())
307 } else {
308 panic!(
309 "A Wasm runtime is required to load user applications. \
310 Please enable the `wasmer` or the `wasmtime` feature flags \
311 when compiling `linera-storage`."
312 );
313 }
314 }
315 }
316 VmRuntime::Evm => {
317 cfg_if::cfg_if! {
318 if #[cfg(with_revm)] {
319 let evm_runtime = EvmRuntime::Revm;
320 Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
321 .into())
322 } else {
323 panic!(
324 "An Evm runtime is required to load user applications. \
325 Please enable the `revm` feature flag \
326 when compiling `linera-storage`."
327 );
328 }
329 }
330 }
331 }
332 }
333
334 async fn load_service(
337 &self,
338 application_description: &ApplicationDescription,
339 txn_tracker: &TransactionTracker,
340 ) -> Result<UserServiceCode, ExecutionError> {
341 let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
342 let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
343 Some(content) => content.clone(),
344 None => self
345 .read_blob(service_bytecode_blob_id)
346 .await?
347 .ok_or(ExecutionError::BlobsNotFound(vec![
348 service_bytecode_blob_id,
349 ]))?
350 .into_content(),
351 };
352 let compressed_service_bytecode = CompressedBytecode {
353 compressed_bytes: content.into_arc_bytes(),
354 };
355 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
356 let service_bytecode = linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
357 move |_| async move { compressed_service_bytecode.decompress() },
358 )
359 .await
360 .join()
361 .await?;
362 match application_description.module_id.vm_runtime {
363 VmRuntime::Wasm => {
364 cfg_if::cfg_if! {
365 if #[cfg(with_wasm_runtime)] {
366 let Some(wasm_runtime) = self.wasm_runtime() else {
367 panic!("A Wasm runtime is required to load user applications.");
368 };
369 Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
370 .await?
371 .into())
372 } else {
373 panic!(
374 "A Wasm runtime is required to load user applications. \
375 Please enable the `wasmer` or the `wasmtime` feature flags \
376 when compiling `linera-storage`."
377 );
378 }
379 }
380 }
381 VmRuntime::Evm => {
382 cfg_if::cfg_if! {
383 if #[cfg(with_revm)] {
384 let evm_runtime = EvmRuntime::Revm;
385 Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
386 .into())
387 } else {
388 panic!(
389 "An Evm runtime is required to load user applications. \
390 Please enable the `revm` feature flag \
391 when compiling `linera-storage`."
392 );
393 }
394 }
395 }
396 }
397 }
398
399 async fn block_exporter_context(
400 &self,
401 block_exporter_id: u32,
402 ) -> Result<Self::BlockExporterContext, ViewError>;
403}
404
405pub enum ResultReadCertificates {
407 Certificates(Vec<ConfirmedBlockCertificate>),
408 InvalidHashes(Vec<CryptoHash>),
409}
410
411impl ResultReadCertificates {
412 pub fn new(
414 certificates: Vec<Option<ConfirmedBlockCertificate>>,
415 hashes: Vec<CryptoHash>,
416 ) -> Self {
417 let (certificates, invalid_hashes) = certificates
418 .into_iter()
419 .zip(hashes)
420 .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
421 Some(cert) => itertools::Either::Left(cert),
422 None => itertools::Either::Right(hash),
423 });
424 if invalid_hashes.is_empty() {
425 Self::Certificates(certificates)
426 } else {
427 Self::InvalidHashes(invalid_hashes)
428 }
429 }
430}
431
432#[derive(Clone)]
434pub struct ChainRuntimeContext<S> {
435 storage: S,
436 chain_id: ChainId,
437 execution_runtime_config: ExecutionRuntimeConfig,
438 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
439 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
440}
441
442#[cfg_attr(not(web), async_trait)]
443#[cfg_attr(web, async_trait(?Send))]
444impl<S> ExecutionRuntimeContext for ChainRuntimeContext<S>
445where
446 S: Storage + Send + Sync,
447{
448 fn chain_id(&self) -> ChainId {
449 self.chain_id
450 }
451
452 fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
453 self.execution_runtime_config
454 }
455
456 fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
457 &self.user_contracts
458 }
459
460 fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
461 &self.user_services
462 }
463
464 async fn get_user_contract(
465 &self,
466 description: &ApplicationDescription,
467 txn_tracker: &TransactionTracker,
468 ) -> Result<UserContractCode, ExecutionError> {
469 let application_id = description.into();
470 let pinned = self.user_contracts.pin_owned();
471 if let Some(contract) = pinned.get(&application_id) {
472 return Ok(contract.clone());
473 }
474 let contract = self.storage.load_contract(description, txn_tracker).await?;
475 pinned.insert(application_id, contract.clone());
476 Ok(contract)
477 }
478
479 async fn get_user_service(
480 &self,
481 description: &ApplicationDescription,
482 txn_tracker: &TransactionTracker,
483 ) -> Result<UserServiceCode, ExecutionError> {
484 let application_id = description.into();
485 let pinned = self.user_services.pin_owned();
486 if let Some(service) = pinned.get(&application_id) {
487 return Ok(service.clone());
488 }
489 let service = self.storage.load_service(description, txn_tracker).await?;
490 pinned.insert(application_id, service.clone());
491 Ok(service)
492 }
493
494 async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
495 self.storage.read_blob(blob_id).await
496 }
497
498 async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
499 self.storage.read_event(event_id).await
500 }
501
502 async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
503 self.storage.read_network_description().await
504 }
505
506 async fn committees_for(
507 &self,
508 epoch_range: RangeInclusive<Epoch>,
509 ) -> Result<BTreeMap<Epoch, Committee>, ViewError> {
510 self.storage.committees_for(epoch_range).await
511 }
512
513 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
514 self.storage.contains_blob(blob_id).await
515 }
516
517 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
518 self.storage.contains_event(event_id).await
519 }
520
521 #[cfg(with_testing)]
522 async fn add_blobs(
523 &self,
524 blobs: impl IntoIterator<Item = Blob> + Send,
525 ) -> Result<(), ViewError> {
526 let blobs = Vec::from_iter(blobs);
527 self.storage.write_blobs(&blobs).await
528 }
529
530 #[cfg(with_testing)]
531 async fn add_events(
532 &self,
533 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
534 ) -> Result<(), ViewError> {
535 self.storage.write_events(events).await
536 }
537}
538
539#[cfg_attr(not(web), async_trait)]
541#[cfg_attr(web, async_trait(?Send))]
542pub trait Clock {
543 fn current_time(&self) -> Timestamp;
544
545 async fn sleep(&self, delta: TimeDelta);
546
547 async fn sleep_until(&self, timestamp: Timestamp);
548}