#![deny(clippy::large_futures)]
mod db_storage;
use std::sync::Arc;
use async_trait::async_trait;
use dashmap::{mapref::entry::Entry, DashMap};
use linera_base::{
crypto::CryptoHash,
data_types::{
Amount, Blob, BlockHeight, CompressedBytecode, TimeDelta, Timestamp,
UserApplicationDescription,
},
hashed::Hashed,
identifiers::{BlobId, BlobType, ChainDescription, ChainId, EventId, Owner, UserApplicationId},
ownership::ChainOwnership,
vm::VmRuntime,
};
use linera_chain::{
types::{ConfirmedBlock, ConfirmedBlockCertificate},
ChainError, ChainStateView,
};
use linera_execution::{
committee::{Committee, Epoch},
system::SystemChannel,
BlobState, ChannelSubscription, ExecutionError, ExecutionRuntimeConfig,
ExecutionRuntimeContext, UserContractCode, UserServiceCode, WasmRuntime,
};
#[cfg(with_revm)]
use linera_execution::{
revm::{EvmContractModule, EvmServiceModule},
EvmRuntime,
};
#[cfg(with_wasm_runtime)]
use linera_execution::{WasmContractModule, WasmServiceModule};
use linera_views::{
context::Context,
views::{CryptoHashView, RootView, ViewError},
};
#[cfg(with_testing)]
pub use crate::db_storage::TestClock;
pub use crate::db_storage::{
list_all_blob_ids, list_all_chain_ids, ChainStatesFirstAssignment, DbStorage, WallClock,
};
#[cfg(with_metrics)]
pub use crate::db_storage::{
READ_CERTIFICATE_COUNTER, READ_HASHED_CONFIRMED_BLOCK_COUNTER, WRITE_CERTIFICATE_COUNTER,
};
#[cfg_attr(not(web), async_trait)]
#[cfg_attr(web, async_trait(?Send))]
pub trait Storage: Sized {
type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + Send + Sync + 'static;
type Clock: Clock;
fn clock(&self) -> &Self::Clock;
async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
async fn read_hashed_confirmed_block(
&self,
hash: CryptoHash,
) -> Result<Hashed<ConfirmedBlock>, ViewError>;
async fn read_blob(&self, blob_id: BlobId) -> Result<Blob, ViewError>;
async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
async fn read_blob_state(&self, blob_id: BlobId) -> Result<BlobState, ViewError>;
async fn read_blob_states(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobState>, ViewError>;
async fn read_hashed_confirmed_blocks_downward(
&self,
from: CryptoHash,
limit: u32,
) -> Result<Vec<Hashed<ConfirmedBlock>>, ViewError>;
async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
async fn write_blobs_and_certificate(
&self,
blobs: &[Blob],
certificate: &ConfirmedBlockCertificate,
) -> Result<(), ViewError>;
async fn write_blob_state(
&self,
blob_id: BlobId,
blob_state: &BlobState,
) -> Result<(), ViewError>;
async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
async fn maybe_write_blob_state(
&self,
blob_id: BlobId,
blob_state: BlobState,
) -> Result<Epoch, ViewError>;
async fn maybe_write_blob_states(
&self,
blob_ids: &[BlobId],
blob_state: BlobState,
overwrite: bool,
) -> Result<Vec<Epoch>, ViewError>;
async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
async fn read_certificate(
&self,
hash: CryptoHash,
) -> Result<ConfirmedBlockCertificate, ViewError>;
async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
&self,
hashes: I,
) -> Result<Vec<ConfirmedBlockCertificate>, ViewError>;
async fn read_event(&self, id: EventId) -> Result<Vec<u8>, ViewError>;
async fn write_events(
&self,
events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
) -> Result<(), ViewError>;
async fn load_active_chain(
&self,
id: ChainId,
) -> Result<ChainStateView<Self::Context>, linera_chain::ChainError>
where
ChainRuntimeContext<Self>: ExecutionRuntimeContext,
{
let chain = self.load_chain(id).await?;
chain.ensure_is_active()?;
Ok(chain)
}
async fn create_chain(
&self,
committee: Committee,
admin_id: ChainId,
description: ChainDescription,
owner: Owner,
balance: Amount,
timestamp: Timestamp,
) -> Result<(), ChainError>
where
ChainRuntimeContext<Self>: ExecutionRuntimeContext,
{
let id = description.into();
let mut chain = self.load_chain(id).await?;
assert!(!chain.is_active(), "Attempting to create a chain twice");
chain.manager.reset(
ChainOwnership::single(owner),
BlockHeight(0),
self.clock().current_time(),
committee.account_keys_and_weights(),
)?;
let system_state = &mut chain.execution_state.system;
system_state.description.set(Some(description));
system_state.epoch.set(Some(Epoch::ZERO));
system_state.admin_id.set(Some(admin_id));
system_state
.committees
.get_mut()
.insert(Epoch::ZERO, committee);
system_state.ownership.set(ChainOwnership::single(owner));
system_state.balance.set(balance);
system_state.timestamp.set(timestamp);
if id != admin_id {
system_state.subscriptions.insert(&ChannelSubscription {
chain_id: admin_id,
name: SystemChannel::Admin.name(),
})?;
let mut admin_chain = self.load_chain(admin_id).await?;
let full_name = SystemChannel::Admin.full_name();
{
let mut channel = admin_chain.channels.try_load_entry_mut(&full_name).await?;
channel.subscribers.insert(&id)?;
} admin_chain.save().await?;
}
let state_hash = chain.execution_state.crypto_hash().await?;
chain.execution_state_hash.set(Some(state_hash));
chain.save().await?;
Ok(())
}
fn wasm_runtime(&self) -> Option<WasmRuntime>;
async fn load_contract(
&self,
application_description: &UserApplicationDescription,
) -> Result<UserContractCode, ExecutionError> {
let contract_bytecode_blob_id = BlobId::new(
application_description.bytecode_id.contract_blob_hash,
BlobType::ContractBytecode,
);
let contract_blob = self.read_blob(contract_bytecode_blob_id).await?;
let compressed_contract_bytecode = CompressedBytecode {
compressed_bytes: contract_blob.into_bytes().to_vec(),
};
#[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
let contract_bytecode =
linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
move |_| async move { compressed_contract_bytecode.decompress() },
)
.await
.join()
.await?;
match application_description.bytecode_id.vm_runtime {
VmRuntime::Wasm => {
cfg_if::cfg_if! {
if #[cfg(with_wasm_runtime)] {
let Some(wasm_runtime) = self.wasm_runtime() else {
panic!("A Wasm runtime is required to load user applications.");
};
Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
.await?
.into())
} else {
panic!(
"A Wasm runtime is required to load user applications. \
Please enable the `wasmer` or the `wasmtime` feature flags \
when compiling `linera-storage`."
);
}
}
}
VmRuntime::Evm => {
cfg_if::cfg_if! {
if #[cfg(with_revm)] {
let evm_runtime = EvmRuntime::Revm;
Ok(EvmContractModule::new(contract_bytecode, evm_runtime)
.await?
.into())
} else {
panic!(
"An Evm runtime is required to load user applications. \
Please enable the `revm` feature flag \
when compiling `linera-storage`."
);
}
}
}
}
}
async fn load_service(
&self,
application_description: &UserApplicationDescription,
) -> Result<UserServiceCode, ExecutionError> {
let service_bytecode_blob_id = BlobId::new(
application_description.bytecode_id.service_blob_hash,
BlobType::ServiceBytecode,
);
let service_blob = self.read_blob(service_bytecode_blob_id).await?;
let compressed_service_bytecode = CompressedBytecode {
compressed_bytes: service_blob.into_bytes().to_vec(),
};
#[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
let service_bytecode = linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
move |_| async move { compressed_service_bytecode.decompress() },
)
.await
.join()
.await?;
match application_description.bytecode_id.vm_runtime {
VmRuntime::Wasm => {
cfg_if::cfg_if! {
if #[cfg(with_wasm_runtime)] {
let Some(wasm_runtime) = self.wasm_runtime() else {
panic!("A Wasm runtime is required to load user applications.");
};
Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
.await?
.into())
} else {
panic!(
"A Wasm runtime is required to load user applications. \
Please enable the `wasmer` or the `wasmtime` feature flags \
when compiling `linera-storage`."
);
}
}
}
VmRuntime::Evm => {
cfg_if::cfg_if! {
if #[cfg(with_revm)] {
let evm_runtime = EvmRuntime::Revm;
Ok(EvmServiceModule::new(service_bytecode, evm_runtime)
.await?
.into())
} else {
panic!(
"An Evm runtime is required to load user applications. \
Please enable the `revm` feature flag \
when compiling `linera-storage`."
);
}
}
}
}
}
}
#[derive(Clone)]
pub struct ChainRuntimeContext<S> {
storage: S,
chain_id: ChainId,
execution_runtime_config: ExecutionRuntimeConfig,
user_contracts: Arc<DashMap<UserApplicationId, UserContractCode>>,
user_services: Arc<DashMap<UserApplicationId, UserServiceCode>>,
}
#[cfg_attr(not(web), async_trait)]
#[cfg_attr(web, async_trait(?Send))]
impl<S> ExecutionRuntimeContext for ChainRuntimeContext<S>
where
S: Storage + Send + Sync,
{
fn chain_id(&self) -> ChainId {
self.chain_id
}
fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
self.execution_runtime_config
}
fn user_contracts(&self) -> &Arc<DashMap<UserApplicationId, UserContractCode>> {
&self.user_contracts
}
fn user_services(&self) -> &Arc<DashMap<UserApplicationId, UserServiceCode>> {
&self.user_services
}
async fn get_user_contract(
&self,
description: &UserApplicationDescription,
) -> Result<UserContractCode, ExecutionError> {
match self.user_contracts.entry(description.into()) {
Entry::Occupied(entry) => Ok(entry.get().clone()),
Entry::Vacant(entry) => {
let contract = self.storage.load_contract(description).await?;
entry.insert(contract.clone());
Ok(contract)
}
}
}
async fn get_user_service(
&self,
description: &UserApplicationDescription,
) -> Result<UserServiceCode, ExecutionError> {
match self.user_services.entry(description.into()) {
Entry::Occupied(entry) => Ok(entry.get().clone()),
Entry::Vacant(entry) => {
let service = self.storage.load_service(description).await?;
entry.insert(service.clone());
Ok(service)
}
}
}
async fn get_blob(&self, blob_id: BlobId) -> Result<Blob, ViewError> {
self.storage.read_blob(blob_id).await
}
async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ViewError> {
self.storage.read_event(event_id).await
}
async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
self.storage.contains_blob(blob_id).await
}
#[cfg(with_testing)]
async fn add_blobs(
&self,
blobs: impl IntoIterator<Item = Blob> + Send,
) -> Result<(), ViewError> {
let blobs = Vec::from_iter(blobs);
self.storage.write_blobs(&blobs).await
}
#[cfg(with_testing)]
async fn add_events(
&self,
events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
) -> Result<(), ViewError> {
self.storage.write_events(events).await
}
}
#[cfg_attr(not(web), async_trait)]
#[cfg_attr(web, async_trait(?Send))]
pub trait Clock {
fn current_time(&self) -> Timestamp;
async fn sleep(&self, delta: TimeDelta);
async fn sleep_until(&self, timestamp: Timestamp);
}