1#![deny(clippy::large_futures)]
7
8mod db_storage;
9
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use dashmap::{mapref::entry::Entry, DashMap};
14use linera_base::{
15 crypto::CryptoHash,
16 data_types::{
17 ApplicationDescription, Blob, ChainDescription, CompressedBytecode, NetworkDescription,
18 TimeDelta, Timestamp,
19 },
20 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
21 vm::VmRuntime,
22};
23use linera_chain::{
24 types::{ConfirmedBlock, ConfirmedBlockCertificate},
25 ChainError, ChainStateView,
26};
27#[cfg(with_revm)]
28use linera_execution::{
29 evm::revm::{EvmContractModule, EvmServiceModule},
30 EvmRuntime,
31};
32use linera_execution::{
33 BlobState, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, UserContractCode,
34 UserServiceCode, WasmRuntime,
35};
36#[cfg(with_wasm_runtime)]
37use linera_execution::{WasmContractModule, WasmServiceModule};
38use linera_views::{context::Context, views::RootView, ViewError};
39
40#[cfg(with_metrics)]
41pub use crate::db_storage::metrics;
42#[cfg(with_testing)]
43pub use crate::db_storage::TestClock;
44pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
45
46pub const DEFAULT_NAMESPACE: &str = "table_linera";
48
49#[cfg_attr(not(web), async_trait)]
51#[cfg_attr(web, async_trait(?Send))]
52pub trait Storage: Sized {
53 type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + Send + Sync + 'static;
55
56 type Clock: Clock;
58
59 type BlockExporterContext: Context<Extra = u32> + Clone + Send + Sync + 'static;
61
62 fn clock(&self) -> &Self::Clock;
64
65 async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
73
74 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
76
77 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
79
80 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
82
83 async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, ViewError>;
85
86 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
88
89 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
91
92 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
94
95 async fn read_blob_states(
97 &self,
98 blob_ids: &[BlobId],
99 ) -> Result<Vec<Option<BlobState>>, ViewError>;
100
101 async fn read_confirmed_blocks_downward(
103 &self,
104 from: CryptoHash,
105 limit: u32,
106 ) -> Result<Vec<ConfirmedBlock>, ViewError>;
107
108 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
110
111 async fn write_blobs_and_certificate(
113 &self,
114 blobs: &[Blob],
115 certificate: &ConfirmedBlockCertificate,
116 ) -> Result<(), ViewError>;
117
118 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
121
122 async fn maybe_write_blob_states(
124 &self,
125 blob_ids: &[BlobId],
126 blob_state: BlobState,
127 ) -> Result<(), ViewError>;
128
129 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
131
132 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
134
135 async fn read_certificate(
137 &self,
138 hash: CryptoHash,
139 ) -> Result<ConfirmedBlockCertificate, ViewError>;
140
141 async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
143 &self,
144 hashes: I,
145 ) -> Result<Vec<ConfirmedBlockCertificate>, ViewError>;
146
147 async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
149
150 async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
152
153 async fn read_events_from_index(
155 &self,
156 chain_id: &ChainId,
157 stream_id: &StreamId,
158 start_index: u32,
159 ) -> Result<Vec<IndexAndEvent>, ViewError>;
160
161 async fn write_events(
163 &self,
164 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
165 ) -> Result<(), ViewError>;
166
167 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
169
170 async fn write_network_description(
172 &self,
173 information: &NetworkDescription,
174 ) -> Result<(), ViewError>;
175
176 async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
184 where
185 ChainRuntimeContext<Self>: ExecutionRuntimeContext,
186 {
187 let id = description.id();
188 self.write_blob(&Blob::new_chain_description(&description))
190 .await?;
191 let mut chain = self.load_chain(id).await?;
192 assert!(!chain.is_active(), "Attempting to create a chain twice");
193 let current_time = self.clock().current_time();
194 chain.ensure_is_active(current_time).await?;
195 chain.save().await?;
196 Ok(())
197 }
198
199 fn wasm_runtime(&self) -> Option<WasmRuntime>;
201
202 async fn load_contract(
205 &self,
206 application_description: &ApplicationDescription,
207 ) -> Result<UserContractCode, ExecutionError> {
208 let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
209 let contract_blob = self.read_blob(contract_bytecode_blob_id).await?.ok_or(
210 ExecutionError::BlobsNotFound(vec![contract_bytecode_blob_id]),
211 )?;
212 let compressed_contract_bytecode = CompressedBytecode {
213 compressed_bytes: contract_blob.into_bytes().to_vec(),
214 };
215 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
216 let contract_bytecode =
217 linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
218 move |_| async move { compressed_contract_bytecode.decompress() },
219 )
220 .await
221 .join()
222 .await?;
223 match application_description.module_id.vm_runtime {
224 VmRuntime::Wasm => {
225 cfg_if::cfg_if! {
226 if #[cfg(with_wasm_runtime)] {
227 let Some(wasm_runtime) = self.wasm_runtime() else {
228 panic!("A Wasm runtime is required to load user applications.");
229 };
230 Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
231 .await?
232 .into())
233 } else {
234 panic!(
235 "A Wasm runtime is required to load user applications. \
236 Please enable the `wasmer` or the `wasmtime` feature flags \
237 when compiling `linera-storage`."
238 );
239 }
240 }
241 }
242 VmRuntime::Evm => {
243 cfg_if::cfg_if! {
244 if #[cfg(with_revm)] {
245 let evm_runtime = EvmRuntime::Revm;
246 Ok(EvmContractModule::new(contract_bytecode, evm_runtime)
247 .await?
248 .into())
249 } else {
250 panic!(
251 "An Evm runtime is required to load user applications. \
252 Please enable the `revm` feature flag \
253 when compiling `linera-storage`."
254 );
255 }
256 }
257 }
258 }
259 }
260
261 async fn load_service(
264 &self,
265 application_description: &ApplicationDescription,
266 ) -> Result<UserServiceCode, ExecutionError> {
267 let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
268 let service_blob = self.read_blob(service_bytecode_blob_id).await?.ok_or(
269 ExecutionError::BlobsNotFound(vec![service_bytecode_blob_id]),
270 )?;
271 let compressed_service_bytecode = CompressedBytecode {
272 compressed_bytes: service_blob.into_bytes().to_vec(),
273 };
274 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
275 let service_bytecode = linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
276 move |_| async move { compressed_service_bytecode.decompress() },
277 )
278 .await
279 .join()
280 .await?;
281 match application_description.module_id.vm_runtime {
282 VmRuntime::Wasm => {
283 cfg_if::cfg_if! {
284 if #[cfg(with_wasm_runtime)] {
285 let Some(wasm_runtime) = self.wasm_runtime() else {
286 panic!("A Wasm runtime is required to load user applications.");
287 };
288 Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
289 .await?
290 .into())
291 } else {
292 panic!(
293 "A Wasm runtime is required to load user applications. \
294 Please enable the `wasmer` or the `wasmtime` feature flags \
295 when compiling `linera-storage`."
296 );
297 }
298 }
299 }
300 VmRuntime::Evm => {
301 cfg_if::cfg_if! {
302 if #[cfg(with_revm)] {
303 let evm_runtime = EvmRuntime::Revm;
304 Ok(EvmServiceModule::new(service_bytecode, evm_runtime)
305 .await?
306 .into())
307 } else {
308 panic!(
309 "An Evm runtime is required to load user applications. \
310 Please enable the `revm` feature flag \
311 when compiling `linera-storage`."
312 );
313 }
314 }
315 }
316 }
317 }
318
319 async fn block_exporter_context(
320 &self,
321 block_exporter_id: u32,
322 ) -> Result<Self::BlockExporterContext, ViewError>;
323}
324
325#[derive(Clone)]
327pub struct ChainRuntimeContext<S> {
328 storage: S,
329 chain_id: ChainId,
330 execution_runtime_config: ExecutionRuntimeConfig,
331 user_contracts: Arc<DashMap<ApplicationId, UserContractCode>>,
332 user_services: Arc<DashMap<ApplicationId, UserServiceCode>>,
333}
334
335#[cfg_attr(not(web), async_trait)]
336#[cfg_attr(web, async_trait(?Send))]
337impl<S> ExecutionRuntimeContext for ChainRuntimeContext<S>
338where
339 S: Storage + Send + Sync,
340{
341 fn chain_id(&self) -> ChainId {
342 self.chain_id
343 }
344
345 fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
346 self.execution_runtime_config
347 }
348
349 fn user_contracts(&self) -> &Arc<DashMap<ApplicationId, UserContractCode>> {
350 &self.user_contracts
351 }
352
353 fn user_services(&self) -> &Arc<DashMap<ApplicationId, UserServiceCode>> {
354 &self.user_services
355 }
356
357 async fn get_user_contract(
358 &self,
359 description: &ApplicationDescription,
360 ) -> Result<UserContractCode, ExecutionError> {
361 match self.user_contracts.entry(description.into()) {
362 Entry::Occupied(entry) => Ok(entry.get().clone()),
363 Entry::Vacant(entry) => {
364 let contract = self.storage.load_contract(description).await?;
365 entry.insert(contract.clone());
366 Ok(contract)
367 }
368 }
369 }
370
371 async fn get_user_service(
372 &self,
373 description: &ApplicationDescription,
374 ) -> Result<UserServiceCode, ExecutionError> {
375 match self.user_services.entry(description.into()) {
376 Entry::Occupied(entry) => Ok(entry.get().clone()),
377 Entry::Vacant(entry) => {
378 let service = self.storage.load_service(description).await?;
379 entry.insert(service.clone());
380 Ok(service)
381 }
382 }
383 }
384
385 async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
386 self.storage.read_blob(blob_id).await
387 }
388
389 async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
390 self.storage.read_event(event_id).await
391 }
392
393 async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
394 self.storage.read_network_description().await
395 }
396
397 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
398 self.storage.contains_blob(blob_id).await
399 }
400
401 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
402 self.storage.contains_event(event_id).await
403 }
404
405 #[cfg(with_testing)]
406 async fn add_blobs(
407 &self,
408 blobs: impl IntoIterator<Item = Blob> + Send,
409 ) -> Result<(), ViewError> {
410 let blobs = Vec::from_iter(blobs);
411 self.storage.write_blobs(&blobs).await
412 }
413
414 #[cfg(with_testing)]
415 async fn add_events(
416 &self,
417 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
418 ) -> Result<(), ViewError> {
419 self.storage.write_events(events).await
420 }
421}
422
423#[cfg_attr(not(web), async_trait)]
425#[cfg_attr(web, async_trait(?Send))]
426pub trait Clock {
427 fn current_time(&self) -> Timestamp;
428
429 async fn sleep(&self, delta: TimeDelta);
430
431 async fn sleep_until(&self, timestamp: Timestamp);
432}