1#![deny(clippy::large_futures)]
7
8mod db_storage;
9
10use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc};
11
12use async_trait::async_trait;
13use dashmap::DashMap;
14use itertools::Itertools;
15use linera_base::{
16 crypto::CryptoHash,
17 data_types::{
18 ApplicationDescription, Blob, ChainDescription, CompressedBytecode, Epoch,
19 NetworkDescription, TimeDelta, Timestamp,
20 },
21 identifiers::{ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent, StreamId},
22 vm::VmRuntime,
23};
24use linera_chain::{
25 types::{ConfirmedBlock, ConfirmedBlockCertificate},
26 ChainError, ChainStateView,
27};
28use linera_execution::{
29 committee::Committee, system::EPOCH_STREAM_NAME, BlobState, ExecutionError,
30 ExecutionRuntimeConfig, ExecutionRuntimeContext, UserContractCode, UserServiceCode,
31 WasmRuntime,
32};
33#[cfg(with_revm)]
34use linera_execution::{
35 evm::revm::{EvmContractModule, EvmServiceModule},
36 EvmRuntime,
37};
38#[cfg(with_wasm_runtime)]
39use linera_execution::{WasmContractModule, WasmServiceModule};
40use linera_views::{context::Context, views::RootView, ViewError};
41
42#[cfg(with_metrics)]
43pub use crate::db_storage::metrics;
44#[cfg(with_testing)]
45pub use crate::db_storage::TestClock;
46pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
47
48pub const DEFAULT_NAMESPACE: &str = "table_linera";
50
51#[cfg_attr(not(web), async_trait)]
53#[cfg_attr(web, async_trait(?Send))]
54pub trait Storage: Sized {
55 type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + Send + Sync + 'static;
57
58 type Clock: Clock;
60
61 type BlockExporterContext: Context<Extra = u32> + Clone + Send + Sync + 'static;
63
64 fn clock(&self) -> &Self::Clock;
66
67 async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
75
76 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
78
79 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
81
82 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
84
85 async fn read_confirmed_block(
87 &self,
88 hash: CryptoHash,
89 ) -> Result<Option<ConfirmedBlock>, ViewError>;
90
91 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
93
94 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
96
97 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
99
100 async fn read_blob_states(
102 &self,
103 blob_ids: &[BlobId],
104 ) -> Result<Vec<Option<BlobState>>, ViewError>;
105
106 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
108
109 async fn write_blobs_and_certificate(
111 &self,
112 blobs: &[Blob],
113 certificate: &ConfirmedBlockCertificate,
114 ) -> Result<(), ViewError>;
115
116 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
119
120 async fn maybe_write_blob_states(
122 &self,
123 blob_ids: &[BlobId],
124 blob_state: BlobState,
125 ) -> Result<(), ViewError>;
126
127 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
129
130 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
132
133 async fn read_certificate(
135 &self,
136 hash: CryptoHash,
137 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
138
139 async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
141 &self,
142 hashes: I,
143 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
144
145 async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
147
148 async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
150
151 async fn read_events_from_index(
153 &self,
154 chain_id: &ChainId,
155 stream_id: &StreamId,
156 start_index: u32,
157 ) -> Result<Vec<IndexAndEvent>, ViewError>;
158
159 async fn write_events(
161 &self,
162 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
163 ) -> Result<(), ViewError>;
164
165 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
167
168 async fn write_network_description(
170 &self,
171 information: &NetworkDescription,
172 ) -> Result<(), ViewError>;
173
174 async fn committees_for(
176 &self,
177 epoch_range: RangeInclusive<Epoch>,
178 ) -> Result<BTreeMap<Epoch, Committee>, ViewError> {
179 if epoch_range.is_empty() {
181 return Ok(BTreeMap::new());
182 }
183 let min_epoch = epoch_range.start();
184 let max_epoch = epoch_range.end();
185 let read_committee = async |committee_hash| -> Result<Committee, ViewError> {
186 let blob_id = BlobId::new(committee_hash, BlobType::Committee);
187 let committee_blob = self
188 .read_blob(blob_id)
189 .await?
190 .ok_or_else(|| ViewError::NotFound(format!("blob {}", blob_id)))?;
191 Ok(bcs::from_bytes(committee_blob.bytes())?)
192 };
193
194 let network_description = self
195 .read_network_description()
196 .await?
197 .ok_or_else(|| ViewError::NotFound("NetworkDescription not found".to_owned()))?;
198 let admin_chain_id = network_description.admin_chain_id;
199 let mut result = BTreeMap::new();
200 if *min_epoch == Epoch::ZERO {
202 let genesis_committee =
203 read_committee(network_description.genesis_committee_blob_hash).await?;
204 result.insert(Epoch::ZERO, genesis_committee);
205 }
206
207 let start_index = min_epoch.0.max(1);
208 let epoch_creation_events = self
209 .read_events_from_index(
210 &admin_chain_id,
211 &StreamId::system(EPOCH_STREAM_NAME),
212 start_index,
213 )
214 .await?;
215
216 result.extend(
217 futures::future::try_join_all(
218 epoch_creation_events
219 .into_iter()
220 .take_while(|index_and_event| index_and_event.index <= max_epoch.0)
221 .map(|index_and_event| async move {
222 let epoch = Epoch::from(index_and_event.index);
223 let maybe_blob_hash = bcs::from_bytes::<CryptoHash>(&index_and_event.event);
224 let committee = read_committee(maybe_blob_hash?).await?;
225 Result::<_, ViewError>::Ok((epoch, committee))
226 }),
227 )
228 .await?,
229 );
230
231 Ok(result)
232 }
233
234 async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
242 where
243 ChainRuntimeContext<Self>: ExecutionRuntimeContext,
244 {
245 let id = description.id();
246 self.write_blob(&Blob::new_chain_description(&description))
248 .await?;
249 let mut chain = self.load_chain(id).await?;
250 assert!(!chain.is_active(), "Attempting to create a chain twice");
251 let current_time = self.clock().current_time();
252 chain.ensure_is_active(current_time).await?;
253 chain.save().await?;
254 Ok(())
255 }
256
257 fn wasm_runtime(&self) -> Option<WasmRuntime>;
259
260 async fn load_contract(
263 &self,
264 application_description: &ApplicationDescription,
265 ) -> Result<UserContractCode, ExecutionError> {
266 let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
267 let contract_blob = self.read_blob(contract_bytecode_blob_id).await?.ok_or(
268 ExecutionError::BlobsNotFound(vec![contract_bytecode_blob_id]),
269 )?;
270 let compressed_contract_bytecode = CompressedBytecode {
271 compressed_bytes: contract_blob.into_bytes().to_vec(),
272 };
273 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
274 let contract_bytecode =
275 linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
276 move |_| async move { compressed_contract_bytecode.decompress() },
277 )
278 .await
279 .join()
280 .await?;
281 match application_description.module_id.vm_runtime {
282 VmRuntime::Wasm => {
283 cfg_if::cfg_if! {
284 if #[cfg(with_wasm_runtime)] {
285 let Some(wasm_runtime) = self.wasm_runtime() else {
286 panic!("A Wasm runtime is required to load user applications.");
287 };
288 Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
289 .await?
290 .into())
291 } else {
292 panic!(
293 "A Wasm runtime is required to load user applications. \
294 Please enable the `wasmer` or the `wasmtime` feature flags \
295 when compiling `linera-storage`."
296 );
297 }
298 }
299 }
300 VmRuntime::Evm => {
301 cfg_if::cfg_if! {
302 if #[cfg(with_revm)] {
303 let evm_runtime = EvmRuntime::Revm;
304 Ok(EvmContractModule::new(contract_bytecode, evm_runtime)
305 .await?
306 .into())
307 } else {
308 panic!(
309 "An Evm runtime is required to load user applications. \
310 Please enable the `revm` feature flag \
311 when compiling `linera-storage`."
312 );
313 }
314 }
315 }
316 }
317 }
318
319 async fn load_service(
322 &self,
323 application_description: &ApplicationDescription,
324 ) -> Result<UserServiceCode, ExecutionError> {
325 let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
326 let service_blob = self.read_blob(service_bytecode_blob_id).await?.ok_or(
327 ExecutionError::BlobsNotFound(vec![service_bytecode_blob_id]),
328 )?;
329 let compressed_service_bytecode = CompressedBytecode {
330 compressed_bytes: service_blob.into_bytes().to_vec(),
331 };
332 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
333 let service_bytecode = linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
334 move |_| async move { compressed_service_bytecode.decompress() },
335 )
336 .await
337 .join()
338 .await?;
339 match application_description.module_id.vm_runtime {
340 VmRuntime::Wasm => {
341 cfg_if::cfg_if! {
342 if #[cfg(with_wasm_runtime)] {
343 let Some(wasm_runtime) = self.wasm_runtime() else {
344 panic!("A Wasm runtime is required to load user applications.");
345 };
346 Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
347 .await?
348 .into())
349 } else {
350 panic!(
351 "A Wasm runtime is required to load user applications. \
352 Please enable the `wasmer` or the `wasmtime` feature flags \
353 when compiling `linera-storage`."
354 );
355 }
356 }
357 }
358 VmRuntime::Evm => {
359 cfg_if::cfg_if! {
360 if #[cfg(with_revm)] {
361 let evm_runtime = EvmRuntime::Revm;
362 Ok(EvmServiceModule::new(service_bytecode, evm_runtime)
363 .await?
364 .into())
365 } else {
366 panic!(
367 "An Evm runtime is required to load user applications. \
368 Please enable the `revm` feature flag \
369 when compiling `linera-storage`."
370 );
371 }
372 }
373 }
374 }
375 }
376
377 async fn block_exporter_context(
378 &self,
379 block_exporter_id: u32,
380 ) -> Result<Self::BlockExporterContext, ViewError>;
381}
382
383pub enum ResultReadCertificates {
385 Certificates(Vec<ConfirmedBlockCertificate>),
386 InvalidHashes(Vec<CryptoHash>),
387}
388
389impl ResultReadCertificates {
390 pub fn new(
392 certificates: Vec<Option<ConfirmedBlockCertificate>>,
393 hashes: Vec<CryptoHash>,
394 ) -> Self {
395 let (certificates, invalid_hashes) = certificates
396 .into_iter()
397 .zip(hashes)
398 .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
399 Some(cert) => itertools::Either::Left(cert),
400 None => itertools::Either::Right(hash),
401 });
402 if invalid_hashes.is_empty() {
403 Self::Certificates(certificates)
404 } else {
405 Self::InvalidHashes(invalid_hashes)
406 }
407 }
408}
409
410#[derive(Clone)]
412pub struct ChainRuntimeContext<S> {
413 storage: S,
414 chain_id: ChainId,
415 execution_runtime_config: ExecutionRuntimeConfig,
416 user_contracts: Arc<DashMap<ApplicationId, UserContractCode>>,
417 user_services: Arc<DashMap<ApplicationId, UserServiceCode>>,
418}
419
420#[cfg_attr(not(web), async_trait)]
421#[cfg_attr(web, async_trait(?Send))]
422impl<S> ExecutionRuntimeContext for ChainRuntimeContext<S>
423where
424 S: Storage + Send + Sync,
425{
426 fn chain_id(&self) -> ChainId {
427 self.chain_id
428 }
429
430 fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
431 self.execution_runtime_config
432 }
433
434 fn user_contracts(&self) -> &Arc<DashMap<ApplicationId, UserContractCode>> {
435 &self.user_contracts
436 }
437
438 fn user_services(&self) -> &Arc<DashMap<ApplicationId, UserServiceCode>> {
439 &self.user_services
440 }
441
442 async fn get_user_contract(
443 &self,
444 description: &ApplicationDescription,
445 ) -> Result<UserContractCode, ExecutionError> {
446 let application_id = description.into();
447 if let Some(contract) = self.user_contracts.get(&application_id) {
448 return Ok(contract.clone());
449 }
450 let contract = self.storage.load_contract(description).await?;
451 self.user_contracts.insert(application_id, contract.clone());
452 Ok(contract)
453 }
454
455 async fn get_user_service(
456 &self,
457 description: &ApplicationDescription,
458 ) -> Result<UserServiceCode, ExecutionError> {
459 let application_id = description.into();
460 if let Some(service) = self.user_services.get(&application_id) {
461 return Ok(service.clone());
462 }
463 let service = self.storage.load_service(description).await?;
464 self.user_services.insert(application_id, service.clone());
465 Ok(service)
466 }
467
468 async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
469 self.storage.read_blob(blob_id).await
470 }
471
472 async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
473 self.storage.read_event(event_id).await
474 }
475
476 async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
477 self.storage.read_network_description().await
478 }
479
480 async fn committees_for(
481 &self,
482 epoch_range: RangeInclusive<Epoch>,
483 ) -> Result<BTreeMap<Epoch, Committee>, ViewError> {
484 self.storage.committees_for(epoch_range).await
485 }
486
487 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
488 self.storage.contains_blob(blob_id).await
489 }
490
491 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
492 self.storage.contains_event(event_id).await
493 }
494
495 #[cfg(with_testing)]
496 async fn add_blobs(
497 &self,
498 blobs: impl IntoIterator<Item = Blob> + Send,
499 ) -> Result<(), ViewError> {
500 let blobs = Vec::from_iter(blobs);
501 self.storage.write_blobs(&blobs).await
502 }
503
504 #[cfg(with_testing)]
505 async fn add_events(
506 &self,
507 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
508 ) -> Result<(), ViewError> {
509 self.storage.write_events(events).await
510 }
511}
512
513#[cfg_attr(not(web), async_trait)]
515#[cfg_attr(web, async_trait(?Send))]
516pub trait Clock {
517 fn current_time(&self) -> Timestamp;
518
519 async fn sleep(&self, delta: TimeDelta);
520
521 async fn sleep_until(&self, timestamp: Timestamp);
522}