#[cfg(test)]
#[path = "./unit_tests/system_tests.rs"]
mod tests;
#[cfg(with_metrics)]
use std::sync::LazyLock;
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
mem,
};
use custom_debug_derive::Debug;
use linera_base::{
crypto::CryptoHash,
data_types::{
Amount, ApplicationPermissions, Blob, BlobContent, BlockHeight, Epoch, OracleResponse,
Timestamp,
},
ensure, hex_debug,
identifiers::{
Account, AccountOwner, BlobId, BlobType, ChainDescription, ChainId, EventId, MessageId,
ModuleId, StreamId,
},
ownership::{ChainOwnership, TimeoutConfig},
};
use linera_views::{
context::Context,
map_view::{HashedMapView, MapView},
register_view::HashedRegisterView,
set_view::HashedSetView,
views::{ClonableView, HashableView, View, ViewError},
};
use serde::{Deserialize, Serialize};
#[cfg(with_metrics)]
use {linera_base::prometheus_util::register_int_counter_vec, prometheus::IntCounterVec};
#[cfg(test)]
use crate::test_utils::SystemExecutionState;
use crate::{
committee::Committee, ApplicationDescription, ApplicationId, ExecutionError,
ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext, OutgoingMessage,
QueryContext, QueryOutcome, ResourceController, TransactionTracker,
};
pub static OPEN_CHAIN_MESSAGE_INDEX: u32 = 0;
pub static EPOCH_STREAM_NAME: &[u8] = &[0];
pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
#[cfg(with_metrics)]
static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"open_chain_count",
"The number of times the `OpenChain` operation was executed",
&[],
)
});
#[derive(Debug, ClonableView, HashableView)]
pub struct SystemExecutionStateView<C> {
pub description: HashedRegisterView<C, Option<ChainDescription>>,
pub epoch: HashedRegisterView<C, Option<Epoch>>,
pub admin_id: HashedRegisterView<C, Option<ChainId>>,
pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
pub ownership: HashedRegisterView<C, ChainOwnership>,
pub balance: HashedRegisterView<C, Amount>,
pub balances: HashedMapView<C, AccountOwner, Amount>,
pub timestamp: HashedRegisterView<C, Timestamp>,
pub closed: HashedRegisterView<C, bool>,
pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
pub used_blobs: HashedSetView<C, BlobId>,
pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct EventSubscriptions {
pub next_index: u32,
pub applications: BTreeSet<ApplicationId>,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct OpenChainConfig {
pub ownership: ChainOwnership,
pub admin_id: ChainId,
pub epoch: Epoch,
pub committees: BTreeMap<Epoch, Committee>,
pub balance: Amount,
pub application_permissions: ApplicationPermissions,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum SystemOperation {
Transfer {
owner: AccountOwner,
recipient: Recipient,
amount: Amount,
},
Claim {
owner: AccountOwner,
target_id: ChainId,
recipient: Recipient,
amount: Amount,
},
OpenChain(OpenChainConfig),
CloseChain,
ChangeOwnership {
#[debug(skip_if = Vec::is_empty)]
super_owners: Vec<AccountOwner>,
#[debug(skip_if = Vec::is_empty)]
owners: Vec<(AccountOwner, u64)>,
multi_leader_rounds: u32,
open_multi_leader_rounds: bool,
timeout_config: TimeoutConfig,
},
ChangeApplicationPermissions(ApplicationPermissions),
PublishModule { module_id: ModuleId },
PublishDataBlob { blob_hash: CryptoHash },
ReadBlob { blob_id: BlobId },
CreateApplication {
module_id: ModuleId,
#[serde(with = "serde_bytes")]
#[debug(with = "hex_debug")]
parameters: Vec<u8>,
#[serde(with = "serde_bytes")]
#[debug(with = "hex_debug", skip_if = Vec::is_empty)]
instantiation_argument: Vec<u8>,
#[debug(skip_if = Vec::is_empty)]
required_application_ids: Vec<ApplicationId>,
},
Admin(AdminOperation),
ProcessNewEpoch(Epoch),
ProcessRemovedEpoch(Epoch),
UpdateStreams(Vec<(ChainId, StreamId, u32)>),
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum AdminOperation {
PublishCommitteeBlob { blob_hash: CryptoHash },
CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
RemoveCommittee { epoch: Epoch },
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum SystemMessage {
Credit {
target: AccountOwner,
amount: Amount,
source: AccountOwner,
},
Withdraw {
owner: AccountOwner,
amount: Amount,
recipient: Recipient,
},
OpenChain(Box<OpenChainConfig>),
ApplicationCreated,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct SystemQuery;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct SystemResponse {
pub chain_id: ChainId,
pub balance: Amount,
}
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
pub enum Recipient {
Burn,
Account(Account),
}
impl Recipient {
pub fn chain(chain_id: ChainId) -> Recipient {
Recipient::Account(Account::chain(chain_id))
}
#[cfg(with_testing)]
pub fn root(index: u32) -> Recipient {
Recipient::chain(ChainId::root(index))
}
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
pub struct UserData(pub Option<[u8; 32]>);
impl UserData {
pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
let option_array = match opt_str {
Some(s) => {
let vec = s.into_bytes();
if vec.len() <= 32 {
let mut array = [b' '; 32];
let len = vec.len().min(32);
array[..len].copy_from_slice(&vec[..len]);
Some(array)
} else {
return Err(vec.len());
}
}
None => None,
};
Ok(UserData(option_array))
}
}
#[derive(Debug)]
pub struct CreateApplicationResult {
pub app_id: ApplicationId,
pub txn_tracker: TransactionTracker,
}
impl<C> SystemExecutionStateView<C>
where
C: Context + Clone + Send + Sync + 'static,
C::Extra: ExecutionRuntimeContext,
{
pub fn is_active(&self) -> bool {
self.description.get().is_some()
&& self.ownership.get().is_active()
&& self.current_committee().is_some()
&& self.admin_id.get().is_some()
}
pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
let epoch = self.epoch.get().as_ref()?;
let committee = self.committees.get().get(epoch)?;
Some((*epoch, committee))
}
pub async fn execute_operation(
&mut self,
context: OperationContext,
operation: SystemOperation,
txn_tracker: &mut TransactionTracker,
resource_controller: &mut ResourceController<Option<AccountOwner>>,
) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
use SystemOperation::*;
let mut new_application = None;
match operation {
OpenChain(config) => {
let next_message_id = context.next_message_id(txn_tracker.next_message_index());
let message = self.open_chain(config, next_message_id).await?;
txn_tracker.add_outgoing_message(message)?;
#[cfg(with_metrics)]
OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
}
ChangeOwnership {
super_owners,
owners,
multi_leader_rounds,
open_multi_leader_rounds,
timeout_config,
} => {
self.ownership.set(ChainOwnership {
super_owners: super_owners.into_iter().collect(),
owners: owners.into_iter().collect(),
multi_leader_rounds,
open_multi_leader_rounds,
timeout_config,
});
}
ChangeApplicationPermissions(application_permissions) => {
self.application_permissions.set(application_permissions);
}
CloseChain => self.close_chain().await?,
Transfer {
owner,
amount,
recipient,
} => {
let maybe_message = self
.transfer(context.authenticated_signer, None, owner, recipient, amount)
.await?;
txn_tracker.add_outgoing_messages(maybe_message)?;
}
Claim {
owner,
target_id,
recipient,
amount,
} => {
let message = self
.claim(
context.authenticated_signer,
None,
owner,
target_id,
recipient,
amount,
)
.await?;
txn_tracker.add_outgoing_message(message)?;
}
Admin(admin_operation) => {
ensure!(
*self.admin_id.get() == Some(context.chain_id),
ExecutionError::AdminOperationOnNonAdminChain
);
match admin_operation {
AdminOperation::PublishCommitteeBlob { blob_hash } => {
self.blob_published(&BlobId::new(blob_hash, BlobType::Committee))?;
}
AdminOperation::CreateCommittee { epoch, blob_hash } => {
self.check_next_epoch(epoch)?;
let blob_id = BlobId::new(blob_hash, BlobType::Committee);
let committee =
bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
self.blob_used(Some(txn_tracker), blob_id).await?;
self.committees.get_mut().insert(epoch, committee);
self.epoch.set(Some(epoch));
txn_tracker.add_event(
StreamId::system(EPOCH_STREAM_NAME),
epoch.0,
bcs::to_bytes(&blob_hash)?,
);
}
AdminOperation::RemoveCommittee { epoch } => {
ensure!(
self.committees.get_mut().remove(&epoch).is_some(),
ExecutionError::InvalidCommitteeRemoval
);
txn_tracker.add_event(
StreamId::system(REMOVED_EPOCH_STREAM_NAME),
epoch.0,
vec![],
);
}
}
}
PublishModule { module_id } => {
for blob_id in module_id.bytecode_blob_ids() {
self.blob_published(&blob_id)?;
}
}
CreateApplication {
module_id,
parameters,
instantiation_argument,
required_application_ids,
} => {
let txn_tracker_moved = mem::take(txn_tracker);
let CreateApplicationResult {
app_id,
txn_tracker: txn_tracker_moved,
} = self
.create_application(
context.chain_id,
context.height,
module_id,
parameters,
required_application_ids,
txn_tracker_moved,
)
.await?;
*txn_tracker = txn_tracker_moved;
new_application = Some((app_id, instantiation_argument));
}
PublishDataBlob { blob_hash } => {
self.blob_published(&BlobId::new(blob_hash, BlobType::Data))?;
}
ReadBlob { blob_id } => {
let content = self.read_blob_content(blob_id).await?;
if blob_id.blob_type == BlobType::Data {
resource_controller
.with_state(self)
.await?
.track_blob_read(content.bytes().len() as u64)?;
}
self.blob_used(Some(txn_tracker), blob_id).await?;
}
ProcessNewEpoch(epoch) => {
self.check_next_epoch(epoch)?;
let admin_id = self
.admin_id
.get()
.ok_or_else(|| ExecutionError::InactiveChain)?;
let event_id = EventId {
chain_id: admin_id,
stream_id: StreamId::system(EPOCH_STREAM_NAME),
index: epoch.0,
};
let bytes = match txn_tracker.next_replayed_oracle_response()? {
None => self.context().extra().get_event(event_id.clone()).await?,
Some(OracleResponse::Event(recorded_event_id, bytes))
if recorded_event_id == event_id =>
{
bytes
}
Some(_) => return Err(ExecutionError::OracleResponseMismatch),
};
let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
self.blob_used(Some(txn_tracker), blob_id).await?;
self.committees.get_mut().insert(epoch, committee);
self.epoch.set(Some(epoch));
}
ProcessRemovedEpoch(epoch) => {
ensure!(
self.committees.get_mut().remove(&epoch).is_some(),
ExecutionError::InvalidCommitteeRemoval
);
let admin_id = self
.admin_id
.get()
.ok_or_else(|| ExecutionError::InactiveChain)?;
let event_id = EventId {
chain_id: admin_id,
stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
index: epoch.0,
};
let bytes = match txn_tracker.next_replayed_oracle_response()? {
None => self.context().extra().get_event(event_id.clone()).await?,
Some(OracleResponse::Event(recorded_event_id, bytes))
if recorded_event_id == event_id =>
{
bytes
}
Some(_) => return Err(ExecutionError::OracleResponseMismatch),
};
txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
}
UpdateStreams(streams) => {
for (chain_id, stream_id, next_index) in streams {
let subscriptions = self
.event_subscriptions
.get_mut_or_default(&(chain_id, stream_id))
.await?;
subscriptions.next_index = subscriptions.next_index.max(next_index);
}
}
}
Ok(new_application)
}
fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
let expected = self.epoch.get().expect("chain is active").try_add_one()?;
ensure!(
provided == expected,
ExecutionError::InvalidCommitteeEpoch { provided, expected }
);
Ok(())
}
pub async fn transfer(
&mut self,
authenticated_signer: Option<AccountOwner>,
authenticated_application_id: Option<ApplicationId>,
source: AccountOwner,
recipient: Recipient,
amount: Amount,
) -> Result<Option<OutgoingMessage>, ExecutionError> {
if source == AccountOwner::CHAIN {
ensure!(
authenticated_signer.is_some()
&& self
.ownership
.get()
.verify_owner(&authenticated_signer.unwrap()),
ExecutionError::UnauthenticatedTransferOwner
);
} else {
ensure!(
authenticated_signer == Some(source)
|| authenticated_application_id.map(AccountOwner::from) == Some(source),
ExecutionError::UnauthenticatedTransferOwner
);
}
ensure!(
amount > Amount::ZERO,
ExecutionError::IncorrectTransferAmount
);
self.debit(&source, amount).await?;
match recipient {
Recipient::Account(account) => {
let message = SystemMessage::Credit {
amount,
source,
target: account.owner,
};
Ok(Some(
OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
))
}
Recipient::Burn => Ok(None),
}
}
pub async fn claim(
&self,
authenticated_signer: Option<AccountOwner>,
authenticated_application_id: Option<ApplicationId>,
source: AccountOwner,
target_id: ChainId,
recipient: Recipient,
amount: Amount,
) -> Result<OutgoingMessage, ExecutionError> {
ensure!(
authenticated_signer == Some(source)
|| authenticated_application_id.map(AccountOwner::from) == Some(source),
ExecutionError::UnauthenticatedClaimOwner
);
ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
let message = SystemMessage::Withdraw {
amount,
owner: source,
recipient,
};
Ok(
OutgoingMessage::new(target_id, message)
.with_authenticated_signer(authenticated_signer),
)
}
async fn debit(
&mut self,
account: &AccountOwner,
amount: Amount,
) -> Result<(), ExecutionError> {
let balance = if account == &AccountOwner::CHAIN {
self.balance.get_mut()
} else {
self.balances.get_mut(account).await?.ok_or_else(|| {
ExecutionError::InsufficientFunding {
balance: Amount::ZERO,
account: *account,
}
})?
};
balance
.try_sub_assign(amount)
.map_err(|_| ExecutionError::InsufficientFunding {
balance: *balance,
account: *account,
})?;
if account != &AccountOwner::CHAIN && balance.is_zero() {
self.balances.remove(account)?;
}
Ok(())
}
pub async fn execute_message(
&mut self,
context: MessageContext,
message: SystemMessage,
) -> Result<Vec<OutgoingMessage>, ExecutionError> {
let mut outcome = Vec::new();
use SystemMessage::*;
match message {
Credit {
amount,
source,
target,
} => {
let receiver = if context.is_bouncing { source } else { target };
if receiver == AccountOwner::CHAIN {
let new_balance = self.balance.get().saturating_add(amount);
self.balance.set(new_balance);
} else {
let balance = self.balances.get_mut_or_default(&receiver).await?;
*balance = balance.saturating_add(amount);
}
}
Withdraw {
amount,
owner,
recipient,
} => {
self.debit(&owner, amount).await?;
match recipient {
Recipient::Account(account) => {
let message = SystemMessage::Credit {
amount,
source: owner,
target: account.owner,
};
outcome.push(
OutgoingMessage::new(account.chain_id, message)
.with_kind(MessageKind::Tracked),
);
}
Recipient::Burn => (),
}
}
OpenChain(_) => {}
ApplicationCreated => {}
}
Ok(outcome)
}
pub fn initialize_chain(
&mut self,
message_id: MessageId,
timestamp: Timestamp,
config: OpenChainConfig,
) {
assert!(self.description.get().is_none());
assert!(!self.ownership.get().is_active());
assert!(self.committees.get().is_empty());
let OpenChainConfig {
ownership,
admin_id,
epoch,
committees,
balance,
application_permissions,
} = config;
let description = ChainDescription::Child(message_id);
self.description.set(Some(description));
self.epoch.set(Some(epoch));
self.committees.set(committees);
self.admin_id.set(Some(admin_id));
self.ownership.set(ownership);
self.timestamp.set(timestamp);
self.balance.set(balance);
self.application_permissions.set(application_permissions);
}
pub async fn handle_query(
&mut self,
context: QueryContext,
_query: SystemQuery,
) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
let response = SystemResponse {
chain_id: context.chain_id,
balance: *self.balance.get(),
};
Ok(QueryOutcome {
response,
operations: vec![],
})
}
pub async fn open_chain(
&mut self,
config: OpenChainConfig,
next_message_id: MessageId,
) -> Result<OutgoingMessage, ExecutionError> {
let child_id = ChainId::child(next_message_id);
ensure!(
self.admin_id.get().as_ref() == Some(&config.admin_id),
ExecutionError::InvalidNewChainAdminId(child_id)
);
ensure!(
self.committees.get() == &config.committees,
ExecutionError::InvalidCommittees
);
ensure!(
self.epoch.get().as_ref() == Some(&config.epoch),
ExecutionError::InvalidEpoch {
chain_id: child_id,
epoch: config.epoch,
}
);
self.debit(&AccountOwner::CHAIN, config.balance).await?;
let message = SystemMessage::OpenChain(Box::new(config));
Ok(OutgoingMessage::new(child_id, message).with_kind(MessageKind::Protected))
}
pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
self.closed.set(true);
Ok(())
}
pub async fn create_application(
&mut self,
chain_id: ChainId,
block_height: BlockHeight,
module_id: ModuleId,
parameters: Vec<u8>,
required_application_ids: Vec<ApplicationId>,
mut txn_tracker: TransactionTracker,
) -> Result<CreateApplicationResult, ExecutionError> {
let application_index = txn_tracker.next_application_index();
let blob_ids = self.check_bytecode_blobs(&module_id).await?;
for blob_id in blob_ids {
self.blob_used(Some(&mut txn_tracker), blob_id).await?;
}
let application_description = ApplicationDescription {
module_id,
creator_chain_id: chain_id,
block_height,
application_index,
parameters,
required_application_ids,
};
self.check_required_applications(&application_description, Some(&mut txn_tracker))
.await?;
txn_tracker.add_created_blob(Blob::new_application_description(&application_description));
Ok(CreateApplicationResult {
app_id: ApplicationId::from(&application_description),
txn_tracker,
})
}
async fn check_required_applications(
&mut self,
application_description: &ApplicationDescription,
mut txn_tracker: Option<&mut TransactionTracker>,
) -> Result<(), ExecutionError> {
for required_id in &application_description.required_application_ids {
Box::pin(self.describe_application(*required_id, txn_tracker.as_deref_mut())).await?;
}
Ok(())
}
pub async fn describe_application(
&mut self,
id: ApplicationId,
mut txn_tracker: Option<&mut TransactionTracker>,
) -> Result<ApplicationDescription, ExecutionError> {
let blob_id = id.description_blob_id();
let blob_content = match txn_tracker
.as_ref()
.and_then(|tracker| tracker.created_blobs().get(&blob_id))
{
Some(blob) => blob.content().clone(),
None => self.read_blob_content(blob_id).await?,
};
self.blob_used(txn_tracker.as_deref_mut(), blob_id).await?;
let description: ApplicationDescription = bcs::from_bytes(blob_content.bytes())?;
let blob_ids = self.check_bytecode_blobs(&description.module_id).await?;
for blob_id in blob_ids {
self.blob_used(txn_tracker.as_deref_mut(), blob_id).await?;
}
self.check_required_applications(&description, txn_tracker)
.await?;
Ok(description)
}
pub async fn find_dependencies(
&mut self,
mut stack: Vec<ApplicationId>,
txn_tracker: &mut TransactionTracker,
) -> Result<Vec<ApplicationId>, ExecutionError> {
let mut result = Vec::new();
let mut sorted = HashSet::new();
let mut seen = HashSet::new();
while let Some(id) = stack.pop() {
if sorted.contains(&id) {
continue;
}
if seen.contains(&id) {
sorted.insert(id);
result.push(id);
continue;
}
seen.insert(id);
stack.push(id);
let app = self.describe_application(id, Some(txn_tracker)).await?;
for child in app.required_application_ids.iter().rev() {
if !seen.contains(child) {
stack.push(*child);
}
}
}
Ok(result)
}
pub(crate) async fn blob_used(
&mut self,
maybe_txn_tracker: Option<&mut TransactionTracker>,
blob_id: BlobId,
) -> Result<bool, ExecutionError> {
if self.used_blobs.contains(&blob_id).await? {
return Ok(false); }
self.used_blobs.insert(&blob_id)?;
if let Some(txn_tracker) = maybe_txn_tracker {
txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
}
Ok(true)
}
fn blob_published(&mut self, blob_id: &BlobId) -> Result<(), ExecutionError> {
self.used_blobs.insert(blob_id)?;
Ok(())
}
pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
match self.context().extra().get_blob(blob_id).await {
Ok(blob) => Ok(blob.into()),
Err(ViewError::BlobsNotFound(_)) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
Err(error) => Err(error.into()),
}
}
pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
if self.context().extra().contains_blob(blob_id).await? {
Ok(())
} else {
Err(ExecutionError::BlobsNotFound(vec![blob_id]))
}
}
async fn check_bytecode_blobs(
&mut self,
module_id: &ModuleId,
) -> Result<Vec<BlobId>, ExecutionError> {
let blob_ids = module_id.bytecode_blob_ids();
let mut missing_blobs = Vec::new();
for blob_id in &blob_ids {
if !self.context().extra().contains_blob(*blob_id).await? {
missing_blobs.push(*blob_id);
}
}
ensure!(
missing_blobs.is_empty(),
ExecutionError::BlobsNotFound(missing_blobs)
);
Ok(blob_ids)
}
}