linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10    collections::{BTreeMap, BTreeSet, HashSet},
11    mem,
12};
13
14use custom_debug_derive::Debug;
15use linera_base::{
16    crypto::CryptoHash,
17    data_types::{
18        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
19        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
20    },
21    ensure, hex_debug,
22    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
23    ownership::{ChainOwnership, TimeoutConfig},
24};
25use linera_views::{
26    context::Context,
27    map_view::{HashedMapView, MapView},
28    register_view::HashedRegisterView,
29    set_view::HashedSetView,
30    views::{ClonableView, HashableView, View},
31};
32use serde::{Deserialize, Serialize};
33
34#[cfg(test)]
35use crate::test_utils::SystemExecutionState;
36use crate::{
37    committee::Committee, ApplicationDescription, ApplicationId, ExecutionError,
38    ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext, OutgoingMessage,
39    QueryContext, QueryOutcome, ResourceController, TransactionTracker,
40};
41
42/// The event stream name for new epochs and committees.
43pub static EPOCH_STREAM_NAME: &[u8] = &[0];
44/// The event stream name for removed epochs.
45pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
46
47/// The number of times the [`SystemOperation::OpenChain`] was executed.
48#[cfg(with_metrics)]
49mod metrics {
50    use std::sync::LazyLock;
51
52    use linera_base::prometheus_util::register_int_counter_vec;
53    use prometheus::IntCounterVec;
54
55    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
56        register_int_counter_vec(
57            "open_chain_count",
58            "The number of times the `OpenChain` operation was executed",
59            &[],
60        )
61    });
62}
63
64/// A view accessing the execution state of the system of a chain.
65#[derive(Debug, ClonableView, HashableView)]
66pub struct SystemExecutionStateView<C> {
67    /// How the chain was created. May be unknown for inactive chains.
68    pub description: HashedRegisterView<C, Option<ChainDescription>>,
69    /// The number identifying the current configuration.
70    pub epoch: HashedRegisterView<C, Epoch>,
71    /// The admin of the chain.
72    pub admin_id: HashedRegisterView<C, Option<ChainId>>,
73    /// The committees that we trust, indexed by epoch number.
74    // Not using a `MapView` because the set active of committees is supposed to be
75    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
76    // (e.g. the `OpenChain` operation).
77    pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
78    /// Ownership of the chain.
79    pub ownership: HashedRegisterView<C, ChainOwnership>,
80    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
81    pub balance: HashedRegisterView<C, Amount>,
82    /// Balances attributed to a given owner.
83    pub balances: HashedMapView<C, AccountOwner, Amount>,
84    /// The timestamp of the most recent block.
85    pub timestamp: HashedRegisterView<C, Timestamp>,
86    /// Whether this chain has been closed.
87    pub closed: HashedRegisterView<C, bool>,
88    /// Permissions for applications on this chain.
89    pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
90    /// Blobs that have been used or published on this chain.
91    pub used_blobs: HashedSetView<C, BlobId>,
92    /// The event stream subscriptions of applications on this chain.
93    pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
94}
95
96/// The applications subscribing to a particular stream, and the next event index.
97#[derive(Debug, Default, Clone, Serialize, Deserialize)]
98pub struct EventSubscriptions {
99    /// The next event index, i.e. the total number of events in this stream that have already
100    /// been processed by this chain.
101    pub next_index: u32,
102    /// The applications that are subscribed to this stream.
103    pub applications: BTreeSet<ApplicationId>,
104}
105
106/// The initial configuration for a new chain.
107#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
108pub struct OpenChainConfig {
109    /// The ownership configuration of the new chain.
110    pub ownership: ChainOwnership,
111    /// The initial chain balance.
112    pub balance: Amount,
113    /// The initial application permissions.
114    pub application_permissions: ApplicationPermissions,
115}
116
117impl OpenChainConfig {
118    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
119    /// parameters.
120    pub fn init_chain_config(
121        &self,
122        epoch: Epoch,
123        min_active_epoch: Epoch,
124        max_active_epoch: Epoch,
125    ) -> InitialChainConfig {
126        InitialChainConfig {
127            application_permissions: self.application_permissions.clone(),
128            balance: self.balance,
129            epoch,
130            min_active_epoch,
131            max_active_epoch,
132            ownership: self.ownership.clone(),
133        }
134    }
135}
136
137/// A system operation.
138#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
139pub enum SystemOperation {
140    /// Transfers `amount` units of value from the given owner's account to the recipient.
141    /// If no owner is given, try to take the units out of the unattributed account.
142    Transfer {
143        owner: AccountOwner,
144        recipient: Recipient,
145        amount: Amount,
146    },
147    /// Claims `amount` units of value from the given owner's account in the remote
148    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
149    /// process the message.
150    Claim {
151        owner: AccountOwner,
152        target_id: ChainId,
153        recipient: Recipient,
154        amount: Amount,
155    },
156    /// Creates (or activates) a new chain.
157    /// This will automatically subscribe to the future committees created by `admin_id`.
158    OpenChain(OpenChainConfig),
159    /// Closes the chain.
160    CloseChain,
161    /// Changes the ownership of the chain.
162    ChangeOwnership {
163        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
164        #[debug(skip_if = Vec::is_empty)]
165        super_owners: Vec<AccountOwner>,
166        /// The regular owners, with their weights that determine how often they are round leader.
167        #[debug(skip_if = Vec::is_empty)]
168        owners: Vec<(AccountOwner, u64)>,
169        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
170        multi_leader_rounds: u32,
171        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
172        /// This should only be `true` on chains with restrictive application permissions and an
173        /// application-based mechanism to select block proposers.
174        open_multi_leader_rounds: bool,
175        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
176        timeout_config: TimeoutConfig,
177    },
178    /// Changes the application permissions configuration on this chain.
179    ChangeApplicationPermissions(ApplicationPermissions),
180    /// Publishes a new application module.
181    PublishModule { module_id: ModuleId },
182    /// Publishes a new data blob.
183    PublishDataBlob { blob_hash: CryptoHash },
184    /// Verifies that the given blob exists. Otherwise the block fails.
185    VerifyBlob { blob_id: BlobId },
186    /// Creates a new application.
187    CreateApplication {
188        module_id: ModuleId,
189        #[serde(with = "serde_bytes")]
190        #[debug(with = "hex_debug")]
191        parameters: Vec<u8>,
192        #[serde(with = "serde_bytes")]
193        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
194        instantiation_argument: Vec<u8>,
195        #[debug(skip_if = Vec::is_empty)]
196        required_application_ids: Vec<ApplicationId>,
197    },
198    /// Operations that are only allowed on the admin chain.
199    Admin(AdminOperation),
200    /// Processes an event about a new epoch and committee.
201    ProcessNewEpoch(Epoch),
202    /// Processes an event about a removed epoch and committee.
203    ProcessRemovedEpoch(Epoch),
204    /// Updates the event stream trackers.
205    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
206}
207
208/// Operations that are only allowed on the admin chain.
209#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
210pub enum AdminOperation {
211    /// Publishes a new committee as a blob. This can be assigned to an epoch using
212    /// [`AdminOperation::CreateCommittee`] in a later block.
213    PublishCommitteeBlob { blob_hash: CryptoHash },
214    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
215    /// [`SystemOperation::ProcessNewEpoch`].
216    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
217    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
218    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
219    /// re-certified) by a block certified by a recent committee.
220    RemoveCommittee { epoch: Epoch },
221}
222
223/// A system message meant to be executed on a remote chain.
224#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
225pub enum SystemMessage {
226    /// Credits `amount` units of value to the account `target` -- unless the message is
227    /// bouncing, in which case `source` is credited instead.
228    Credit {
229        target: AccountOwner,
230        amount: Amount,
231        source: AccountOwner,
232    },
233    /// Withdraws `amount` units of value from the account and starts a transfer to credit
234    /// the recipient. The message must be properly authenticated. Receiver chains may
235    /// refuse it depending on their configuration.
236    Withdraw {
237        owner: AccountOwner,
238        amount: Amount,
239        recipient: Recipient,
240    },
241    /// Notifies that a new application was created.
242    ApplicationCreated,
243}
244
245/// A query to the system state.
246#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
247pub struct SystemQuery;
248
249/// The response to a system query.
250#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
251pub struct SystemResponse {
252    pub chain_id: ChainId,
253    pub balance: Amount,
254}
255
256/// The recipient of a transfer.
257#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
258pub enum Recipient {
259    /// This is mainly a placeholder for future extensions.
260    Burn,
261    /// Transfers to the balance of the given account.
262    Account(Account),
263}
264
265impl Recipient {
266    /// Returns the default recipient for the given chain (no owner).
267    pub fn chain(chain_id: ChainId) -> Recipient {
268        Recipient::Account(Account::chain(chain_id))
269    }
270}
271
272impl From<ChainId> for Recipient {
273    fn from(chain_id: ChainId) -> Self {
274        Recipient::chain(chain_id)
275    }
276}
277
278impl From<Account> for Recipient {
279    fn from(account: Account) -> Self {
280        Recipient::Account(account)
281    }
282}
283
284/// Optional user message attached to a transfer.
285#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
286pub struct UserData(pub Option<[u8; 32]>);
287
288impl UserData {
289    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
290        // Convert the Option<String> to Option<[u8; 32]>
291        let option_array = match opt_str {
292            Some(s) => {
293                // Convert the String to a Vec<u8>
294                let vec = s.into_bytes();
295                if vec.len() <= 32 {
296                    // Create an array from the Vec<u8>
297                    let mut array = [b' '; 32];
298
299                    // Copy bytes from the vector into the array
300                    let len = vec.len().min(32);
301                    array[..len].copy_from_slice(&vec[..len]);
302
303                    Some(array)
304                } else {
305                    return Err(vec.len());
306                }
307            }
308            None => None,
309        };
310
311        // Return the UserData with the converted Option<[u8; 32]>
312        Ok(UserData(option_array))
313    }
314}
315
316#[derive(Debug)]
317pub struct CreateApplicationResult {
318    pub app_id: ApplicationId,
319    pub txn_tracker: TransactionTracker,
320}
321
322impl<C> SystemExecutionStateView<C>
323where
324    C: Context + Clone + Send + Sync + 'static,
325    C::Extra: ExecutionRuntimeContext,
326{
327    /// Invariant for the states of active chains.
328    pub fn is_active(&self) -> bool {
329        self.description.get().is_some()
330            && self.ownership.get().is_active()
331            && self.current_committee().is_some()
332            && self.admin_id.get().is_some()
333    }
334
335    /// Returns the current committee, if any.
336    pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
337        let epoch = self.epoch.get();
338        let committee = self.committees.get().get(epoch)?;
339        Some((*epoch, committee))
340    }
341
342    async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
343        match self.context().extra().get_event(event_id.clone()).await? {
344            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
345            Some(vec) => Ok(vec),
346        }
347    }
348
349    /// Executes the sender's side of an operation and returns a list of actions to be
350    /// taken.
351    pub async fn execute_operation(
352        &mut self,
353        context: OperationContext,
354        operation: SystemOperation,
355        txn_tracker: &mut TransactionTracker,
356        resource_controller: &mut ResourceController<Option<AccountOwner>>,
357    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
358        use SystemOperation::*;
359        let mut new_application = None;
360        match operation {
361            OpenChain(config) => {
362                let _chain_id = self
363                    .open_chain(
364                        config,
365                        context.chain_id,
366                        context.height,
367                        context.timestamp,
368                        txn_tracker,
369                    )
370                    .await?;
371                #[cfg(with_metrics)]
372                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
373            }
374            ChangeOwnership {
375                super_owners,
376                owners,
377                multi_leader_rounds,
378                open_multi_leader_rounds,
379                timeout_config,
380            } => {
381                self.ownership.set(ChainOwnership {
382                    super_owners: super_owners.into_iter().collect(),
383                    owners: owners.into_iter().collect(),
384                    multi_leader_rounds,
385                    open_multi_leader_rounds,
386                    timeout_config,
387                });
388            }
389            ChangeApplicationPermissions(application_permissions) => {
390                self.application_permissions.set(application_permissions);
391            }
392            CloseChain => self.close_chain().await?,
393            Transfer {
394                owner,
395                amount,
396                recipient,
397            } => {
398                let maybe_message = self
399                    .transfer(context.authenticated_signer, None, owner, recipient, amount)
400                    .await?;
401                txn_tracker.add_outgoing_messages(maybe_message)?;
402            }
403            Claim {
404                owner,
405                target_id,
406                recipient,
407                amount,
408            } => {
409                let message = self
410                    .claim(
411                        context.authenticated_signer,
412                        None,
413                        owner,
414                        target_id,
415                        recipient,
416                        amount,
417                    )
418                    .await?;
419                txn_tracker.add_outgoing_message(message)?;
420            }
421            Admin(admin_operation) => {
422                ensure!(
423                    *self.admin_id.get() == Some(context.chain_id),
424                    ExecutionError::AdminOperationOnNonAdminChain
425                );
426                match admin_operation {
427                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
428                        self.blob_published(
429                            &BlobId::new(blob_hash, BlobType::Committee),
430                            txn_tracker,
431                        )?;
432                    }
433                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
434                        self.check_next_epoch(epoch)?;
435                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
436                        let committee =
437                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
438                        self.blob_used(txn_tracker, blob_id).await?;
439                        self.committees.get_mut().insert(epoch, committee);
440                        self.epoch.set(epoch);
441                        txn_tracker.add_event(
442                            StreamId::system(EPOCH_STREAM_NAME),
443                            epoch.0,
444                            bcs::to_bytes(&blob_hash)?,
445                        );
446                    }
447                    AdminOperation::RemoveCommittee { epoch } => {
448                        ensure!(
449                            self.committees.get_mut().remove(&epoch).is_some(),
450                            ExecutionError::InvalidCommitteeRemoval
451                        );
452                        txn_tracker.add_event(
453                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
454                            epoch.0,
455                            vec![],
456                        );
457                    }
458                }
459            }
460            PublishModule { module_id } => {
461                for blob_id in module_id.bytecode_blob_ids() {
462                    self.blob_published(&blob_id, txn_tracker)?;
463                }
464            }
465            CreateApplication {
466                module_id,
467                parameters,
468                instantiation_argument,
469                required_application_ids,
470            } => {
471                let txn_tracker_moved = mem::take(txn_tracker);
472                let CreateApplicationResult {
473                    app_id,
474                    txn_tracker: txn_tracker_moved,
475                } = self
476                    .create_application(
477                        context.chain_id,
478                        context.height,
479                        module_id,
480                        parameters,
481                        required_application_ids,
482                        txn_tracker_moved,
483                    )
484                    .await?;
485                *txn_tracker = txn_tracker_moved;
486                new_application = Some((app_id, instantiation_argument));
487            }
488            PublishDataBlob { blob_hash } => {
489                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
490            }
491            VerifyBlob { blob_id } => {
492                self.assert_blob_exists(blob_id).await?;
493                resource_controller
494                    .with_state(self)
495                    .await?
496                    .track_blob_read(0)?;
497                self.blob_used(txn_tracker, blob_id).await?;
498            }
499            ProcessNewEpoch(epoch) => {
500                self.check_next_epoch(epoch)?;
501                let admin_id = self
502                    .admin_id
503                    .get()
504                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
505                let event_id = EventId {
506                    chain_id: admin_id,
507                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
508                    index: epoch.0,
509                };
510                let bytes = match txn_tracker.next_replayed_oracle_response()? {
511                    None => self.get_event(event_id.clone()).await?,
512                    Some(OracleResponse::Event(recorded_event_id, bytes))
513                        if recorded_event_id == event_id =>
514                    {
515                        bytes
516                    }
517                    Some(_) => return Err(ExecutionError::OracleResponseMismatch),
518                };
519                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
520                txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
521                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
522                self.blob_used(txn_tracker, blob_id).await?;
523                self.committees.get_mut().insert(epoch, committee);
524                self.epoch.set(epoch);
525            }
526            ProcessRemovedEpoch(epoch) => {
527                ensure!(
528                    self.committees.get_mut().remove(&epoch).is_some(),
529                    ExecutionError::InvalidCommitteeRemoval
530                );
531                let admin_id = self
532                    .admin_id
533                    .get()
534                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
535                let event_id = EventId {
536                    chain_id: admin_id,
537                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
538                    index: epoch.0,
539                };
540                let bytes = match txn_tracker.next_replayed_oracle_response()? {
541                    None => self.get_event(event_id.clone()).await?,
542                    Some(OracleResponse::Event(recorded_event_id, bytes))
543                        if recorded_event_id == event_id =>
544                    {
545                        bytes
546                    }
547                    Some(_) => return Err(ExecutionError::OracleResponseMismatch),
548                };
549                txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
550            }
551            UpdateStreams(streams) => {
552                let mut missing_events = Vec::new();
553                for (chain_id, stream_id, next_index) in streams {
554                    let subscriptions = self
555                        .event_subscriptions
556                        .get_mut_or_default(&(chain_id, stream_id.clone()))
557                        .await?;
558                    ensure!(
559                        subscriptions.next_index < next_index,
560                        ExecutionError::OutdatedUpdateStreams
561                    );
562                    for application_id in &subscriptions.applications {
563                        txn_tracker.add_stream_to_process(
564                            *application_id,
565                            chain_id,
566                            stream_id.clone(),
567                            subscriptions.next_index,
568                            next_index,
569                        );
570                    }
571                    subscriptions.next_index = next_index;
572                    let index = next_index
573                        .checked_sub(1)
574                        .ok_or(ArithmeticError::Underflow)?;
575                    let event_id = EventId {
576                        chain_id,
577                        stream_id,
578                        index,
579                    };
580                    if !self
581                        .context()
582                        .extra()
583                        .contains_event(event_id.clone())
584                        .await?
585                    {
586                        missing_events.push(event_id)
587                    }
588                }
589                ensure!(
590                    missing_events.is_empty(),
591                    ExecutionError::EventsNotFound(missing_events)
592                );
593            }
594        }
595
596        Ok(new_application)
597    }
598
599    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
600    /// epoch.
601    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
602        let expected = self.epoch.get().try_add_one()?;
603        ensure!(
604            provided == expected,
605            ExecutionError::InvalidCommitteeEpoch { provided, expected }
606        );
607        Ok(())
608    }
609
610    pub async fn transfer(
611        &mut self,
612        authenticated_signer: Option<AccountOwner>,
613        authenticated_application_id: Option<ApplicationId>,
614        source: AccountOwner,
615        recipient: Recipient,
616        amount: Amount,
617    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
618        if source == AccountOwner::CHAIN {
619            ensure!(
620                authenticated_signer.is_some()
621                    && self
622                        .ownership
623                        .get()
624                        .verify_owner(&authenticated_signer.unwrap()),
625                ExecutionError::UnauthenticatedTransferOwner
626            );
627        } else {
628            ensure!(
629                authenticated_signer == Some(source)
630                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
631                ExecutionError::UnauthenticatedTransferOwner
632            );
633        }
634        ensure!(
635            amount > Amount::ZERO,
636            ExecutionError::IncorrectTransferAmount
637        );
638        self.debit(&source, amount).await?;
639        match recipient {
640            Recipient::Account(account) => {
641                let message = SystemMessage::Credit {
642                    amount,
643                    source,
644                    target: account.owner,
645                };
646                Ok(Some(
647                    OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
648                ))
649            }
650            Recipient::Burn => Ok(None),
651        }
652    }
653
654    pub async fn claim(
655        &self,
656        authenticated_signer: Option<AccountOwner>,
657        authenticated_application_id: Option<ApplicationId>,
658        source: AccountOwner,
659        target_id: ChainId,
660        recipient: Recipient,
661        amount: Amount,
662    ) -> Result<OutgoingMessage, ExecutionError> {
663        ensure!(
664            authenticated_signer == Some(source)
665                || authenticated_application_id.map(AccountOwner::from) == Some(source),
666            ExecutionError::UnauthenticatedClaimOwner
667        );
668        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
669
670        let message = SystemMessage::Withdraw {
671            amount,
672            owner: source,
673            recipient,
674        };
675        Ok(
676            OutgoingMessage::new(target_id, message)
677                .with_authenticated_signer(authenticated_signer),
678        )
679    }
680
681    /// Debits an [`Amount`] of tokens from an account's balance.
682    async fn debit(
683        &mut self,
684        account: &AccountOwner,
685        amount: Amount,
686    ) -> Result<(), ExecutionError> {
687        let balance = if account == &AccountOwner::CHAIN {
688            self.balance.get_mut()
689        } else {
690            self.balances.get_mut(account).await?.ok_or_else(|| {
691                ExecutionError::InsufficientBalance {
692                    balance: Amount::ZERO,
693                    account: *account,
694                }
695            })?
696        };
697
698        balance
699            .try_sub_assign(amount)
700            .map_err(|_| ExecutionError::InsufficientBalance {
701                balance: *balance,
702                account: *account,
703            })?;
704
705        if account != &AccountOwner::CHAIN && balance.is_zero() {
706            self.balances.remove(account)?;
707        }
708
709        Ok(())
710    }
711
712    /// Executes a cross-chain message that represents the recipient's side of an operation.
713    pub async fn execute_message(
714        &mut self,
715        context: MessageContext,
716        message: SystemMessage,
717    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
718        let mut outcome = Vec::new();
719        use SystemMessage::*;
720        match message {
721            Credit {
722                amount,
723                source,
724                target,
725            } => {
726                let receiver = if context.is_bouncing { source } else { target };
727                if receiver == AccountOwner::CHAIN {
728                    let new_balance = self.balance.get().saturating_add(amount);
729                    self.balance.set(new_balance);
730                } else {
731                    let balance = self.balances.get_mut_or_default(&receiver).await?;
732                    *balance = balance.saturating_add(amount);
733                }
734            }
735            Withdraw {
736                amount,
737                owner,
738                recipient,
739            } => {
740                self.debit(&owner, amount).await?;
741                match recipient {
742                    Recipient::Account(account) => {
743                        let message = SystemMessage::Credit {
744                            amount,
745                            source: owner,
746                            target: account.owner,
747                        };
748                        outcome.push(
749                            OutgoingMessage::new(account.chain_id, message)
750                                .with_kind(MessageKind::Tracked),
751                        );
752                    }
753                    Recipient::Burn => (),
754                }
755            }
756            // This message is only a placeholder: Its ID is part of the application ID.
757            ApplicationCreated => {}
758        }
759        Ok(outcome)
760    }
761
762    /// Initializes the system application state on a newly opened chain.
763    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
764    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
765        if self.description.get().is_some() {
766            // already initialized
767            return Ok(true);
768        }
769        let description_blob = self
770            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
771            .await?;
772        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
773        let InitialChainConfig {
774            ownership,
775            epoch,
776            balance,
777            min_active_epoch,
778            max_active_epoch,
779            application_permissions,
780        } = description.config().clone();
781        self.timestamp.set(description.timestamp());
782        self.description.set(Some(description));
783        self.epoch.set(epoch);
784        let committees = self
785            .context()
786            .extra()
787            .committees_for(min_active_epoch..=max_active_epoch)
788            .await?;
789        self.committees.set(committees);
790        let admin_id = self
791            .context()
792            .extra()
793            .get_network_description()
794            .await?
795            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
796            .admin_chain_id;
797        self.admin_id.set(Some(admin_id));
798        self.ownership.set(ownership);
799        self.balance.set(balance);
800        self.application_permissions.set(application_permissions);
801        Ok(false)
802    }
803
804    pub async fn handle_query(
805        &mut self,
806        context: QueryContext,
807        _query: SystemQuery,
808    ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
809        let response = SystemResponse {
810            chain_id: context.chain_id,
811            balance: *self.balance.get(),
812        };
813        Ok(QueryOutcome {
814            response,
815            operations: vec![],
816        })
817    }
818
819    /// Returns the messages to open a new chain, and subtracts the new chain's balance
820    /// from this chain's.
821    pub async fn open_chain(
822        &mut self,
823        config: OpenChainConfig,
824        parent: ChainId,
825        block_height: BlockHeight,
826        timestamp: Timestamp,
827        txn_tracker: &mut TransactionTracker,
828    ) -> Result<ChainId, ExecutionError> {
829        let chain_index = txn_tracker.next_chain_index();
830        let chain_origin = ChainOrigin::Child {
831            parent,
832            block_height,
833            chain_index,
834        };
835        let init_chain_config = config.init_chain_config(
836            *self.epoch.get(),
837            self.committees
838                .get()
839                .keys()
840                .min()
841                .copied()
842                .unwrap_or(Epoch::ZERO),
843            self.committees
844                .get()
845                .keys()
846                .max()
847                .copied()
848                .unwrap_or(Epoch::ZERO),
849        );
850        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
851        let child_id = chain_description.id();
852        self.debit(&AccountOwner::CHAIN, config.balance).await?;
853        let blob = Blob::new_chain_description(&chain_description);
854        txn_tracker.add_created_blob(blob);
855        Ok(child_id)
856    }
857
858    pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
859        self.closed.set(true);
860        Ok(())
861    }
862
863    pub async fn create_application(
864        &mut self,
865        chain_id: ChainId,
866        block_height: BlockHeight,
867        module_id: ModuleId,
868        parameters: Vec<u8>,
869        required_application_ids: Vec<ApplicationId>,
870        mut txn_tracker: TransactionTracker,
871    ) -> Result<CreateApplicationResult, ExecutionError> {
872        let application_index = txn_tracker.next_application_index();
873
874        let blob_ids = self.check_bytecode_blobs(&module_id).await?;
875        // We only remember to register the blobs that aren't recorded in `used_blobs`
876        // already.
877        for blob_id in blob_ids {
878            self.blob_used(&mut txn_tracker, blob_id).await?;
879        }
880
881        let application_description = ApplicationDescription {
882            module_id,
883            creator_chain_id: chain_id,
884            block_height,
885            application_index,
886            parameters,
887            required_application_ids,
888        };
889        self.check_required_applications(&application_description, &mut txn_tracker)
890            .await?;
891
892        let blob = Blob::new_application_description(&application_description);
893        self.used_blobs.insert(&blob.id())?;
894        txn_tracker.add_created_blob(blob);
895
896        Ok(CreateApplicationResult {
897            app_id: ApplicationId::from(&application_description),
898            txn_tracker,
899        })
900    }
901
902    async fn check_required_applications(
903        &mut self,
904        application_description: &ApplicationDescription,
905        txn_tracker: &mut TransactionTracker,
906    ) -> Result<(), ExecutionError> {
907        // Make sure that referenced applications IDs have been registered.
908        for required_id in &application_description.required_application_ids {
909            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
910        }
911        Ok(())
912    }
913
914    /// Retrieves an application's description.
915    pub async fn describe_application(
916        &mut self,
917        id: ApplicationId,
918        txn_tracker: &mut TransactionTracker,
919    ) -> Result<ApplicationDescription, ExecutionError> {
920        let blob_id = id.description_blob_id();
921        let blob_content = match txn_tracker.created_blobs().get(&blob_id) {
922            Some(blob) => blob.content().clone(),
923            None => self.read_blob_content(blob_id).await?,
924        };
925        self.blob_used(txn_tracker, blob_id).await?;
926        let description: ApplicationDescription = bcs::from_bytes(blob_content.bytes())?;
927
928        let blob_ids = self.check_bytecode_blobs(&description.module_id).await?;
929        // We only remember to register the blobs that aren't recorded in `used_blobs`
930        // already.
931        for blob_id in blob_ids {
932            self.blob_used(txn_tracker, blob_id).await?;
933        }
934
935        self.check_required_applications(&description, txn_tracker)
936            .await?;
937
938        Ok(description)
939    }
940
941    /// Retrieves the recursive dependencies of applications and applies a topological sort.
942    pub async fn find_dependencies(
943        &mut self,
944        mut stack: Vec<ApplicationId>,
945        txn_tracker: &mut TransactionTracker,
946    ) -> Result<Vec<ApplicationId>, ExecutionError> {
947        // What we return at the end.
948        let mut result = Vec::new();
949        // The entries already inserted in `result`.
950        let mut sorted = HashSet::new();
951        // The entries for which dependencies have already been pushed once to the stack.
952        let mut seen = HashSet::new();
953
954        while let Some(id) = stack.pop() {
955            if sorted.contains(&id) {
956                continue;
957            }
958            if seen.contains(&id) {
959                // Second time we see this entry. It was last pushed just before its
960                // dependencies -- which are now fully sorted.
961                sorted.insert(id);
962                result.push(id);
963                continue;
964            }
965            // First time we see this entry:
966            // 1. Mark it so that its dependencies are no longer pushed to the stack.
967            seen.insert(id);
968            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
969            stack.push(id);
970            let app = self.describe_application(id, txn_tracker).await?;
971            for child in app.required_application_ids.iter().rev() {
972                if !seen.contains(child) {
973                    stack.push(*child);
974                }
975            }
976        }
977        Ok(result)
978    }
979
980    /// Records a blob that is used in this block. If this is the first use on this chain, creates
981    /// an oracle response for it.
982    pub(crate) async fn blob_used(
983        &mut self,
984        txn_tracker: &mut TransactionTracker,
985        blob_id: BlobId,
986    ) -> Result<bool, ExecutionError> {
987        if self.used_blobs.contains(&blob_id).await? {
988            return Ok(false); // Nothing to do.
989        }
990        self.used_blobs.insert(&blob_id)?;
991        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
992        Ok(true)
993    }
994
995    /// Records a blob that is published in this block. This does not create an oracle entry, and
996    /// the blob can be used without using an oracle in the future on this chain.
997    fn blob_published(
998        &mut self,
999        blob_id: &BlobId,
1000        txn_tracker: &mut TransactionTracker,
1001    ) -> Result<(), ExecutionError> {
1002        self.used_blobs.insert(blob_id)?;
1003        txn_tracker.add_published_blob(*blob_id);
1004        Ok(())
1005    }
1006
1007    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1008        match self.context().extra().get_blob(blob_id).await {
1009            Ok(Some(blob)) => Ok(blob.into()),
1010            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1011            Err(error) => Err(error.into()),
1012        }
1013    }
1014
1015    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1016        if self.context().extra().contains_blob(blob_id).await? {
1017            Ok(())
1018        } else {
1019            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1020        }
1021    }
1022
1023    async fn check_bytecode_blobs(
1024        &mut self,
1025        module_id: &ModuleId,
1026    ) -> Result<Vec<BlobId>, ExecutionError> {
1027        let blob_ids = module_id.bytecode_blob_ids();
1028
1029        let mut missing_blobs = Vec::new();
1030        for blob_id in &blob_ids {
1031            if !self.context().extra().contains_blob(*blob_id).await? {
1032                missing_blobs.push(*blob_id);
1033            }
1034        }
1035        ensure!(
1036            missing_blobs.is_empty(),
1037            ExecutionError::BlobsNotFound(missing_blobs)
1038        );
1039
1040        Ok(blob_ids)
1041    }
1042}