#[cfg(test)]
#[path = "./unit_tests/system_tests.rs"]
mod tests;
#[cfg(with_metrics)]
use std::sync::LazyLock;
use std::{
collections::BTreeMap,
fmt::{self, Display, Formatter},
iter,
};
use async_graphql::Enum;
use custom_debug_derive::Debug;
use linera_base::{
crypto::CryptoHash,
data_types::{
Amount, ApplicationPermissions, ArithmeticError, BlobContent, OracleResponse, Timestamp,
},
ensure, hex_debug,
identifiers::{
Account, AccountOwner, BlobId, BlobType, BytecodeId, ChainDescription, ChainId,
ChannelFullName, MessageId, Owner,
},
ownership::{ChainOwnership, TimeoutConfig},
};
use linera_views::{
context::Context,
map_view::HashedMapView,
register_view::HashedRegisterView,
set_view::HashedSetView,
views::{ClonableView, HashableView, View, ViewError},
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[cfg(with_metrics)]
use {linera_base::prometheus_util::register_int_counter_vec, prometheus::IntCounterVec};
#[cfg(test)]
use crate::test_utils::SystemExecutionState;
use crate::{
committee::{Committee, Epoch},
ApplicationRegistryView, ChannelName, ChannelSubscription, Destination,
ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext, QueryContext,
QueryOutcome, RawExecutionOutcome, RawOutgoingMessage, TransactionTracker,
UserApplicationDescription, UserApplicationId,
};
pub static OPEN_CHAIN_MESSAGE_INDEX: u32 = 0;
pub static CREATE_APPLICATION_MESSAGE_INDEX: u32 = 0;
#[cfg(with_metrics)]
static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"open_chain_count",
"The number of times the `OpenChain` operation was executed",
&[],
)
});
#[derive(Debug, ClonableView, HashableView)]
pub struct SystemExecutionStateView<C> {
pub description: HashedRegisterView<C, Option<ChainDescription>>,
pub epoch: HashedRegisterView<C, Option<Epoch>>,
pub admin_id: HashedRegisterView<C, Option<ChainId>>,
pub subscriptions: HashedSetView<C, ChannelSubscription>,
pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
pub ownership: HashedRegisterView<C, ChainOwnership>,
pub balance: HashedRegisterView<C, Amount>,
pub balances: HashedMapView<C, AccountOwner, Amount>,
pub timestamp: HashedRegisterView<C, Timestamp>,
pub registry: ApplicationRegistryView<C>,
pub closed: HashedRegisterView<C, bool>,
pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
pub used_blobs: HashedSetView<C, BlobId>,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct OpenChainConfig {
pub ownership: ChainOwnership,
pub admin_id: ChainId,
pub epoch: Epoch,
pub committees: BTreeMap<Epoch, Committee>,
pub balance: Amount,
pub application_permissions: ApplicationPermissions,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum SystemOperation {
Transfer {
#[debug(skip_if = Option::is_none)]
owner: Option<Owner>,
recipient: Recipient,
amount: Amount,
},
Claim {
owner: Owner,
target_id: ChainId,
recipient: Recipient,
amount: Amount,
},
OpenChain(OpenChainConfig),
CloseChain,
ChangeOwnership {
#[debug(skip_if = Vec::is_empty)]
super_owners: Vec<Owner>,
#[debug(skip_if = Vec::is_empty)]
owners: Vec<(Owner, u64)>,
multi_leader_rounds: u32,
open_multi_leader_rounds: bool,
timeout_config: TimeoutConfig,
},
ChangeApplicationPermissions(ApplicationPermissions),
Subscribe {
chain_id: ChainId,
channel: SystemChannel,
},
Unsubscribe {
chain_id: ChainId,
channel: SystemChannel,
},
PublishBytecode { bytecode_id: BytecodeId },
PublishDataBlob { blob_hash: CryptoHash },
ReadBlob { blob_id: BlobId },
CreateApplication {
bytecode_id: BytecodeId,
#[serde(with = "serde_bytes")]
#[debug(with = "hex_debug")]
parameters: Vec<u8>,
#[serde(with = "serde_bytes")]
#[debug(with = "hex_debug", skip_if = Vec::is_empty)]
instantiation_argument: Vec<u8>,
#[debug(skip_if = Vec::is_empty)]
required_application_ids: Vec<UserApplicationId>,
},
RequestApplication {
chain_id: ChainId,
application_id: UserApplicationId,
},
Admin(AdminOperation),
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum AdminOperation {
CreateCommittee { epoch: Epoch, committee: Committee },
RemoveCommittee { epoch: Epoch },
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum SystemMessage {
Credit {
#[debug(skip_if = Option::is_none)]
target: Option<AccountOwner>,
amount: Amount,
#[debug(skip_if = Option::is_none)]
source: Option<AccountOwner>,
},
Withdraw {
owner: AccountOwner,
amount: Amount,
recipient: Recipient,
},
OpenChain(OpenChainConfig),
CreateCommittee { epoch: Epoch, committee: Committee },
RemoveCommittee { epoch: Epoch },
Subscribe {
id: ChainId,
subscription: ChannelSubscription,
},
Unsubscribe {
id: ChainId,
subscription: ChannelSubscription,
},
ApplicationCreated,
RegisterApplications {
applications: Vec<UserApplicationDescription>,
},
RequestApplication(UserApplicationId),
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct SystemQuery;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct SystemResponse {
pub chain_id: ChainId,
pub balance: Amount,
}
#[derive(
Enum, Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, clap::ValueEnum,
)]
pub enum SystemChannel {
Admin,
}
impl SystemChannel {
pub fn name(&self) -> ChannelName {
bcs::to_bytes(self)
.expect("`SystemChannel` can be serialized")
.into()
}
pub fn full_name(&self) -> ChannelFullName {
ChannelFullName::system(self.name())
}
}
impl Display for SystemChannel {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
let display_name = match self {
SystemChannel::Admin => "Admin",
};
write!(formatter, "{display_name}")
}
}
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
pub enum Recipient {
Burn,
Account(Account),
}
impl Recipient {
pub fn chain(chain_id: ChainId) -> Recipient {
Recipient::Account(Account::chain(chain_id))
}
#[cfg(with_testing)]
pub fn root(index: u32) -> Recipient {
Recipient::chain(ChainId::root(index))
}
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
pub struct UserData(pub Option<[u8; 32]>);
impl UserData {
pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
let option_array = match opt_str {
Some(s) => {
let vec = s.into_bytes();
if vec.len() <= 32 {
let mut array = [b' '; 32];
let len = vec.len().min(32);
array[..len].copy_from_slice(&vec[..len]);
Some(array)
} else {
return Err(vec.len());
}
}
None => None,
};
Ok(UserData(option_array))
}
}
#[derive(Clone, Debug)]
pub struct CreateApplicationResult {
pub app_id: UserApplicationId,
pub message: RawOutgoingMessage<SystemMessage, Amount>,
pub blobs_to_register: Vec<BlobId>,
}
#[derive(Error, Debug)]
pub enum SystemExecutionError {
#[error(transparent)]
ArithmeticError(#[from] ArithmeticError),
#[error(transparent)]
ViewError(ViewError),
#[error("Invalid admin ID in new chain: {0}")]
InvalidNewChainAdminId(ChainId),
#[error("Invalid committees")]
InvalidCommittees,
#[error("{epoch:?} is not recognized by chain {chain_id:}")]
InvalidEpoch { chain_id: ChainId, epoch: Epoch },
#[error("Transfer must have positive amount")]
IncorrectTransferAmount,
#[error("Transfer from owned account must be authenticated by the right signer")]
UnauthenticatedTransferOwner,
#[error("The transferred amount must not exceed the current chain balance: {balance}")]
InsufficientFunding { balance: Amount },
#[error("Required execution fees exceeded the total funding available: {balance}")]
InsufficientFundingForFees { balance: Amount },
#[error("Claim must have positive amount")]
IncorrectClaimAmount,
#[error("Claim must be authenticated by the right signer")]
UnauthenticatedClaimOwner,
#[error("Admin operations are only allowed on the admin chain.")]
AdminOperationOnNonAdminChain,
#[error("Failed to create new committee")]
InvalidCommitteeCreation,
#[error("Failed to remove committee")]
InvalidCommitteeRemoval,
#[error(
"Chain {0} tried to subscribe to the admin channel ({1}) of a chain that is not the admin chain"
)]
InvalidAdminSubscription(ChainId, SystemChannel),
#[error("Cannot subscribe to a channel ({1}) on the same chain ({0})")]
SelfSubscription(ChainId, SystemChannel),
#[error("Chain {0} tried to subscribe to channel {1} but it is already subscribed")]
AlreadySubscribedToChannel(ChainId, SystemChannel),
#[error("Invalid unsubscription request to channel {1} on chain {0}")]
InvalidUnsubscription(ChainId, SystemChannel),
#[error("Amount overflow")]
AmountOverflow,
#[error("Amount underflow")]
AmountUnderflow,
#[error("Chain balance overflow")]
BalanceOverflow,
#[error("Chain balance underflow")]
BalanceUnderflow,
#[error("Cannot set epoch to a lower value")]
CannotRewindEpoch,
#[error("Cannot decrease the chain's timestamp")]
TicksOutOfOrder,
#[error("Application {0:?} is not registered by the chain")]
UnknownApplicationId(Box<UserApplicationId>),
#[error("Chain is not active yet.")]
InactiveChain,
#[error("Blobs not found: {0:?}")]
BlobsNotFound(Vec<BlobId>),
#[error("Oracle response mismatch")]
OracleResponseMismatch,
#[error("No recorded response for oracle query")]
MissingOracleResponse,
}
impl From<ViewError> for SystemExecutionError {
fn from(error: ViewError) -> Self {
match error {
ViewError::BlobsNotFound(blob_ids) => SystemExecutionError::BlobsNotFound(blob_ids),
error => SystemExecutionError::ViewError(error),
}
}
}
impl<C> SystemExecutionStateView<C>
where
C: Context + Clone + Send + Sync + 'static,
C::Extra: ExecutionRuntimeContext,
{
pub fn is_active(&self) -> bool {
self.description.get().is_some()
&& self.ownership.get().is_active()
&& self.current_committee().is_some()
&& self.admin_id.get().is_some()
}
pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
let epoch = self.epoch.get().as_ref()?;
let committee = self.committees.get().get(epoch)?;
Some((*epoch, committee))
}
pub async fn execute_operation(
&mut self,
context: OperationContext,
operation: SystemOperation,
txn_tracker: &mut TransactionTracker,
) -> Result<Option<(UserApplicationId, Vec<u8>)>, SystemExecutionError> {
use SystemOperation::*;
let mut outcome = RawExecutionOutcome {
authenticated_signer: context.authenticated_signer,
refund_grant_to: context.refund_grant_to(),
..RawExecutionOutcome::default()
};
let mut new_application = None;
match operation {
OpenChain(config) => {
let next_message_id = context.next_message_id(txn_tracker.next_message_index());
let messages = self.open_chain(config, next_message_id).await?;
outcome.messages.extend(messages);
#[cfg(with_metrics)]
OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
}
ChangeOwnership {
super_owners,
owners,
multi_leader_rounds,
open_multi_leader_rounds,
timeout_config,
} => {
self.ownership.set(ChainOwnership {
super_owners: super_owners.into_iter().collect(),
owners: owners.into_iter().collect(),
multi_leader_rounds,
open_multi_leader_rounds,
timeout_config,
});
}
ChangeApplicationPermissions(application_permissions) => {
self.application_permissions.set(application_permissions);
}
CloseChain => {
let messages = self.close_chain(context.chain_id).await?;
outcome.messages.extend(messages);
}
Transfer {
owner,
amount,
recipient,
..
} => {
let message = self
.transfer(
context.authenticated_signer,
None,
owner.map(AccountOwner::User),
recipient,
amount,
)
.await?;
if let Some(message) = message {
outcome.messages.push(message)
}
}
Claim {
owner,
target_id,
recipient,
amount,
} => {
let message = self
.claim(
context.authenticated_signer,
None,
AccountOwner::User(owner),
target_id,
recipient,
amount,
)
.await?;
outcome.messages.push(message)
}
Admin(admin_operation) => {
ensure!(
*self.admin_id.get() == Some(context.chain_id),
SystemExecutionError::AdminOperationOnNonAdminChain
);
match admin_operation {
AdminOperation::CreateCommittee { epoch, committee } => {
ensure!(
epoch == self.epoch.get().expect("chain is active").try_add_one()?,
SystemExecutionError::InvalidCommitteeCreation
);
self.committees.get_mut().insert(epoch, committee.clone());
self.epoch.set(Some(epoch));
let message = RawOutgoingMessage {
destination: Destination::Subscribers(SystemChannel::Admin.name()),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Protected,
message: SystemMessage::CreateCommittee { epoch, committee },
};
outcome.messages.push(message);
}
AdminOperation::RemoveCommittee { epoch } => {
ensure!(
self.committees.get_mut().remove(&epoch).is_some(),
SystemExecutionError::InvalidCommitteeRemoval
);
let message = RawOutgoingMessage {
destination: Destination::Subscribers(SystemChannel::Admin.name()),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Protected,
message: SystemMessage::RemoveCommittee { epoch },
};
outcome.messages.push(message);
}
}
}
Subscribe { chain_id, channel } => {
ensure!(
context.chain_id != chain_id,
SystemExecutionError::SelfSubscription(context.chain_id, channel)
);
if channel == SystemChannel::Admin {
ensure!(
self.admin_id.get().as_ref() == Some(&chain_id),
SystemExecutionError::InvalidAdminSubscription(context.chain_id, channel)
);
}
let subscription = ChannelSubscription {
chain_id,
name: channel.name(),
};
ensure!(
!self.subscriptions.contains(&subscription).await?,
SystemExecutionError::AlreadySubscribedToChannel(context.chain_id, channel)
);
self.subscriptions.insert(&subscription)?;
let message = RawOutgoingMessage {
destination: Destination::Recipient(chain_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Protected,
message: SystemMessage::Subscribe {
id: context.chain_id,
subscription,
},
};
outcome.messages.push(message);
}
Unsubscribe { chain_id, channel } => {
let subscription = ChannelSubscription {
chain_id,
name: channel.name(),
};
ensure!(
self.subscriptions.contains(&subscription).await?,
SystemExecutionError::InvalidUnsubscription(context.chain_id, channel)
);
self.subscriptions.remove(&subscription)?;
let message = RawOutgoingMessage {
destination: Destination::Recipient(chain_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Protected,
message: SystemMessage::Unsubscribe {
id: context.chain_id,
subscription,
},
};
outcome.messages.push(message);
}
PublishBytecode { bytecode_id } => {
self.blob_published(&BlobId::new(
bytecode_id.contract_blob_hash,
BlobType::ContractBytecode,
))?;
self.blob_published(&BlobId::new(
bytecode_id.service_blob_hash,
BlobType::ServiceBytecode,
))?;
}
CreateApplication {
bytecode_id,
parameters,
instantiation_argument,
required_application_ids,
} => {
let next_message_id = context.next_message_id(txn_tracker.next_message_index());
let CreateApplicationResult {
app_id,
message,
blobs_to_register,
} = self
.create_application(
next_message_id,
bytecode_id,
parameters,
required_application_ids,
)
.await?;
self.record_bytecode_blobs(blobs_to_register, txn_tracker)
.await?;
outcome.messages.push(message);
new_application = Some((app_id, instantiation_argument));
}
RequestApplication {
chain_id,
application_id,
} => {
let message = RawOutgoingMessage {
destination: Destination::Recipient(chain_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Simple,
message: SystemMessage::RequestApplication(application_id),
};
outcome.messages.push(message);
}
PublishDataBlob { blob_hash } => {
self.blob_published(&BlobId::new(blob_hash, BlobType::Data))?;
}
ReadBlob { blob_id } => {
self.read_blob_content(blob_id).await?;
self.blob_used(Some(txn_tracker), blob_id).await?;
}
}
txn_tracker.add_system_outcome(outcome)?;
Ok(new_application)
}
pub async fn transfer(
&mut self,
authenticated_signer: Option<Owner>,
authenticated_application_id: Option<UserApplicationId>,
source: Option<AccountOwner>,
recipient: Recipient,
amount: Amount,
) -> Result<Option<RawOutgoingMessage<SystemMessage, Amount>>, SystemExecutionError> {
match (source, authenticated_signer, authenticated_application_id) {
(Some(AccountOwner::User(owner)), Some(signer), _) => ensure!(
signer == owner,
SystemExecutionError::UnauthenticatedTransferOwner
),
(
Some(AccountOwner::Application(account_application)),
_,
Some(authorized_application),
) => ensure!(
account_application == authorized_application,
SystemExecutionError::UnauthenticatedTransferOwner
),
(None, Some(signer), _) => ensure!(
self.ownership.get().verify_owner(&signer),
SystemExecutionError::UnauthenticatedTransferOwner
),
(_, _, _) => return Err(SystemExecutionError::UnauthenticatedTransferOwner),
}
ensure!(
amount > Amount::ZERO,
SystemExecutionError::IncorrectTransferAmount
);
self.debit(source.as_ref(), amount).await?;
match recipient {
Recipient::Account(account) => {
let message = RawOutgoingMessage {
destination: Destination::Recipient(account.chain_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Tracked,
message: SystemMessage::Credit {
amount,
source,
target: account.owner,
},
};
Ok(Some(message))
}
Recipient::Burn => Ok(None),
}
}
pub async fn claim(
&self,
authenticated_signer: Option<Owner>,
authenticated_application_id: Option<UserApplicationId>,
source: AccountOwner,
target_id: ChainId,
recipient: Recipient,
amount: Amount,
) -> Result<RawOutgoingMessage<SystemMessage, Amount>, SystemExecutionError> {
match source {
AccountOwner::User(owner) => ensure!(
authenticated_signer == Some(owner),
SystemExecutionError::UnauthenticatedClaimOwner
),
AccountOwner::Application(owner) => ensure!(
authenticated_application_id == Some(owner),
SystemExecutionError::UnauthenticatedClaimOwner
),
}
ensure!(
amount > Amount::ZERO,
SystemExecutionError::IncorrectClaimAmount
);
Ok(RawOutgoingMessage {
destination: Destination::Recipient(target_id),
authenticated: true,
grant: Amount::ZERO,
kind: MessageKind::Simple,
message: SystemMessage::Withdraw {
amount,
owner: source,
recipient,
},
})
}
async fn debit(
&mut self,
account: Option<&AccountOwner>,
amount: Amount,
) -> Result<(), SystemExecutionError> {
let balance = if let Some(owner) = account {
self.balances.get_mut(owner).await?.ok_or_else(|| {
SystemExecutionError::InsufficientFunding {
balance: Amount::ZERO,
}
})?
} else {
self.balance.get_mut()
};
balance
.try_sub_assign(amount)
.map_err(|_| SystemExecutionError::InsufficientFunding { balance: *balance })?;
if let Some(owner) = account {
if balance.is_zero() {
self.balances.remove(owner)?;
}
}
Ok(())
}
pub async fn execute_message(
&mut self,
context: MessageContext,
message: SystemMessage,
txn_tracker: &mut TransactionTracker,
) -> Result<RawExecutionOutcome<SystemMessage, Amount>, SystemExecutionError> {
let mut outcome = RawExecutionOutcome::default();
use SystemMessage::*;
match message {
Credit {
amount,
source,
target,
} => {
let receiver = if context.is_bouncing { source } else { target };
match receiver {
None => {
let new_balance = self.balance.get().saturating_add(amount);
self.balance.set(new_balance);
}
Some(owner) => {
let balance = self.balances.get_mut_or_default(&owner).await?;
*balance = balance.saturating_add(amount);
}
}
}
Withdraw {
amount,
owner,
recipient,
} => {
self.debit(Some(&owner), amount).await?;
match recipient {
Recipient::Account(account) => {
let message = RawOutgoingMessage {
destination: Destination::Recipient(account.chain_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Tracked,
message: SystemMessage::Credit {
amount,
source: Some(owner),
target: account.owner,
},
};
outcome.messages.push(message);
}
Recipient::Burn => (),
}
}
CreateCommittee { epoch, committee } => {
let chain_next_epoch = self.epoch.get().expect("chain is active").try_add_one()?;
ensure!(
epoch <= chain_next_epoch,
SystemExecutionError::InvalidCommitteeCreation
);
if epoch == chain_next_epoch {
self.committees.get_mut().insert(epoch, committee);
self.epoch.set(Some(epoch));
}
}
RemoveCommittee { epoch } => {
self.committees.get_mut().remove(&epoch);
}
RegisterApplications { applications } => {
for application in applications {
self.check_and_record_bytecode_blobs(&application.bytecode_id, txn_tracker)
.await?;
self.registry.register_application(application).await?;
}
}
RequestApplication(application_id) => {
let applications = self
.registry
.describe_applications_with_dependencies(vec![application_id])
.await?;
let message = RawOutgoingMessage {
destination: Destination::Recipient(context.message_id.chain_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Simple,
message: SystemMessage::RegisterApplications { applications },
};
outcome.messages.push(message);
}
Subscribe { .. } | Unsubscribe { .. } | OpenChain(_) => {}
ApplicationCreated => {}
}
Ok(outcome)
}
pub fn initialize_chain(
&mut self,
message_id: MessageId,
timestamp: Timestamp,
config: OpenChainConfig,
) {
assert!(self.description.get().is_none());
assert!(!self.ownership.get().is_active());
assert!(self.committees.get().is_empty());
let OpenChainConfig {
ownership,
admin_id,
epoch,
committees,
balance,
application_permissions,
} = config;
let description = ChainDescription::Child(message_id);
self.description.set(Some(description));
self.epoch.set(Some(epoch));
self.committees.set(committees);
self.admin_id.set(Some(admin_id));
self.subscriptions
.insert(&ChannelSubscription {
chain_id: admin_id,
name: SystemChannel::Admin.name(),
})
.expect("serialization failed");
self.ownership.set(ownership);
self.timestamp.set(timestamp);
self.balance.set(balance);
self.application_permissions.set(application_permissions);
}
pub async fn handle_query(
&mut self,
context: QueryContext,
_query: SystemQuery,
) -> Result<QueryOutcome<SystemResponse>, SystemExecutionError> {
let response = SystemResponse {
chain_id: context.chain_id,
balance: *self.balance.get(),
};
Ok(QueryOutcome {
response,
operations: vec![],
})
}
pub async fn open_chain(
&mut self,
config: OpenChainConfig,
next_message_id: MessageId,
) -> Result<[RawOutgoingMessage<SystemMessage, Amount>; 2], SystemExecutionError> {
let child_id = ChainId::child(next_message_id);
ensure!(
self.admin_id.get().as_ref() == Some(&config.admin_id),
SystemExecutionError::InvalidNewChainAdminId(child_id)
);
let admin_id = config.admin_id;
ensure!(
self.committees.get() == &config.committees,
SystemExecutionError::InvalidCommittees
);
ensure!(
self.epoch.get().as_ref() == Some(&config.epoch),
SystemExecutionError::InvalidEpoch {
chain_id: child_id,
epoch: config.epoch,
}
);
self.debit(None, config.balance).await?;
let open_chain_message = RawOutgoingMessage {
destination: Destination::Recipient(child_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Protected,
message: SystemMessage::OpenChain(config),
};
let subscription = ChannelSubscription {
chain_id: admin_id,
name: SystemChannel::Admin.name(),
};
let subscribe_message = RawOutgoingMessage {
destination: Destination::Recipient(admin_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Protected,
message: SystemMessage::Subscribe {
id: child_id,
subscription,
},
};
Ok([open_chain_message, subscribe_message])
}
pub async fn close_chain(
&mut self,
id: ChainId,
) -> Result<Vec<RawOutgoingMessage<SystemMessage, Amount>>, SystemExecutionError> {
let mut messages = Vec::new();
self.subscriptions
.for_each_index(|subscription| {
let message = RawOutgoingMessage {
destination: Destination::Recipient(subscription.chain_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Protected,
message: SystemMessage::Unsubscribe { id, subscription },
};
messages.push(message);
Ok(())
})
.await?;
self.subscriptions.clear();
self.closed.set(true);
Ok(messages)
}
pub async fn create_application(
&mut self,
next_message_id: MessageId,
bytecode_id: BytecodeId,
parameters: Vec<u8>,
required_application_ids: Vec<UserApplicationId>,
) -> Result<CreateApplicationResult, SystemExecutionError> {
let id = UserApplicationId {
bytecode_id,
creation: next_message_id,
};
let mut blobs_to_register = vec![];
for application in required_application_ids.iter().chain(iter::once(&id)) {
let (contract_bytecode_blob_id, service_bytecode_blob_id) =
self.check_bytecode_blobs(&application.bytecode_id).await?;
if !self.used_blobs.contains(&contract_bytecode_blob_id).await? {
blobs_to_register.push(contract_bytecode_blob_id);
}
if !self.used_blobs.contains(&service_bytecode_blob_id).await? {
blobs_to_register.push(service_bytecode_blob_id);
}
}
self.registry
.register_new_application(id, parameters, required_application_ids)
.await?;
let message = RawOutgoingMessage {
destination: Destination::Recipient(next_message_id.chain_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Protected,
message: SystemMessage::ApplicationCreated,
};
Ok(CreateApplicationResult {
app_id: id,
message,
blobs_to_register,
})
}
pub(crate) async fn blob_used(
&mut self,
txn_tracker: Option<&mut TransactionTracker>,
blob_id: BlobId,
) -> Result<bool, SystemExecutionError> {
if self.used_blobs.contains(&blob_id).await? {
return Ok(false); }
self.used_blobs.insert(&blob_id)?;
if let Some(txn_tracker) = txn_tracker {
txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
}
Ok(true)
}
fn blob_published(&mut self, blob_id: &BlobId) -> Result<(), SystemExecutionError> {
self.used_blobs.insert(blob_id)?;
Ok(())
}
pub async fn read_blob_content(
&mut self,
blob_id: BlobId,
) -> Result<BlobContent, SystemExecutionError> {
match self.context().extra().get_blob(blob_id).await {
Ok(blob) => Ok(blob.into()),
Err(ViewError::BlobsNotFound(_)) => {
Err(SystemExecutionError::BlobsNotFound(vec![blob_id]))
}
Err(error) => Err(error.into()),
}
}
pub async fn assert_blob_exists(
&mut self,
blob_id: BlobId,
) -> Result<(), SystemExecutionError> {
if self.context().extra().contains_blob(blob_id).await? {
Ok(())
} else {
Err(SystemExecutionError::BlobsNotFound(vec![blob_id]))
}
}
async fn check_bytecode_blobs(
&mut self,
bytecode_id: &BytecodeId,
) -> Result<(BlobId, BlobId), SystemExecutionError> {
let contract_bytecode_blob_id =
BlobId::new(bytecode_id.contract_blob_hash, BlobType::ContractBytecode);
let mut missing_blobs = Vec::new();
if !self
.context()
.extra()
.contains_blob(contract_bytecode_blob_id)
.await?
{
missing_blobs.push(contract_bytecode_blob_id);
}
let service_bytecode_blob_id =
BlobId::new(bytecode_id.service_blob_hash, BlobType::ServiceBytecode);
if !self
.context()
.extra()
.contains_blob(service_bytecode_blob_id)
.await?
{
missing_blobs.push(service_bytecode_blob_id);
}
ensure!(
missing_blobs.is_empty(),
SystemExecutionError::BlobsNotFound(missing_blobs)
);
Ok((contract_bytecode_blob_id, service_bytecode_blob_id))
}
async fn record_bytecode_blobs(
&mut self,
blob_ids: Vec<BlobId>,
txn_tracker: &mut TransactionTracker,
) -> Result<(), SystemExecutionError> {
for blob_id in blob_ids {
self.blob_used(Some(txn_tracker), blob_id).await?;
}
Ok(())
}
async fn check_and_record_bytecode_blobs(
&mut self,
bytecode_id: &BytecodeId,
txn_tracker: &mut TransactionTracker,
) -> Result<(), SystemExecutionError> {
let (contract_bytecode_blob_id, service_bytecode_blob_id) =
self.check_bytecode_blobs(bytecode_id).await?;
self.record_bytecode_blobs(
vec![contract_bytecode_blob_id, service_bytecode_blob_id],
txn_tracker,
)
.await
}
}