linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10    collections::{BTreeMap, BTreeSet, HashSet},
11    mem,
12};
13
14use custom_debug_derive::Debug;
15use linera_base::{
16    crypto::CryptoHash,
17    data_types::{
18        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
19        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
20    },
21    ensure, hex_debug,
22    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
23    ownership::{ChainOwnership, TimeoutConfig},
24};
25use linera_views::{
26    context::Context,
27    map_view::{HashedMapView, MapView},
28    register_view::HashedRegisterView,
29    set_view::HashedSetView,
30    views::{ClonableView, HashableView, View},
31};
32use serde::{Deserialize, Serialize};
33
34#[cfg(test)]
35use crate::test_utils::SystemExecutionState;
36use crate::{
37    committee::Committee, ApplicationDescription, ApplicationId, ExecutionError,
38    ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext, OutgoingMessage,
39    QueryContext, QueryOutcome, ResourceController, TransactionTracker,
40};
41
42/// The event stream name for new epochs and committees.
43pub static EPOCH_STREAM_NAME: &[u8] = &[0];
44/// The event stream name for removed epochs.
45pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
46
47/// The number of times the [`SystemOperation::OpenChain`] was executed.
48#[cfg(with_metrics)]
49mod metrics {
50    use std::sync::LazyLock;
51
52    use linera_base::prometheus_util::register_int_counter_vec;
53    use prometheus::IntCounterVec;
54
55    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
56        register_int_counter_vec(
57            "open_chain_count",
58            "The number of times the `OpenChain` operation was executed",
59            &[],
60        )
61    });
62}
63
64/// A view accessing the execution state of the system of a chain.
65#[derive(Debug, ClonableView, HashableView)]
66pub struct SystemExecutionStateView<C> {
67    /// How the chain was created. May be unknown for inactive chains.
68    pub description: HashedRegisterView<C, Option<ChainDescription>>,
69    /// The number identifying the current configuration.
70    pub epoch: HashedRegisterView<C, Epoch>,
71    /// The admin of the chain.
72    pub admin_id: HashedRegisterView<C, Option<ChainId>>,
73    /// The committees that we trust, indexed by epoch number.
74    // Not using a `MapView` because the set active of committees is supposed to be
75    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
76    // (e.g. the `OpenChain` operation).
77    pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
78    /// Ownership of the chain.
79    pub ownership: HashedRegisterView<C, ChainOwnership>,
80    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
81    pub balance: HashedRegisterView<C, Amount>,
82    /// Balances attributed to a given owner.
83    pub balances: HashedMapView<C, AccountOwner, Amount>,
84    /// The timestamp of the most recent block.
85    pub timestamp: HashedRegisterView<C, Timestamp>,
86    /// Whether this chain has been closed.
87    pub closed: HashedRegisterView<C, bool>,
88    /// Permissions for applications on this chain.
89    pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
90    /// Blobs that have been used or published on this chain.
91    pub used_blobs: HashedSetView<C, BlobId>,
92    /// The event stream subscriptions of applications on this chain.
93    pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
94}
95
96/// The applications subscribing to a particular stream, and the next event index.
97#[derive(Debug, Default, Clone, Serialize, Deserialize)]
98pub struct EventSubscriptions {
99    /// The next event index, i.e. the total number of events in this stream that have already
100    /// been processed by this chain.
101    pub next_index: u32,
102    /// The applications that are subscribed to this stream.
103    pub applications: BTreeSet<ApplicationId>,
104}
105
106/// The initial configuration for a new chain.
107#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
108pub struct OpenChainConfig {
109    /// The ownership configuration of the new chain.
110    pub ownership: ChainOwnership,
111    /// The initial chain balance.
112    pub balance: Amount,
113    /// The initial application permissions.
114    pub application_permissions: ApplicationPermissions,
115}
116
117impl OpenChainConfig {
118    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
119    /// parameters.
120    pub fn init_chain_config(
121        &self,
122        epoch: Epoch,
123        committees: BTreeMap<Epoch, Vec<u8>>,
124    ) -> InitialChainConfig {
125        InitialChainConfig {
126            application_permissions: self.application_permissions.clone(),
127            balance: self.balance,
128            committees,
129            epoch,
130            ownership: self.ownership.clone(),
131        }
132    }
133}
134
135/// A system operation.
136#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
137pub enum SystemOperation {
138    /// Transfers `amount` units of value from the given owner's account to the recipient.
139    /// If no owner is given, try to take the units out of the unattributed account.
140    Transfer {
141        owner: AccountOwner,
142        recipient: Recipient,
143        amount: Amount,
144    },
145    /// Claims `amount` units of value from the given owner's account in the remote
146    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
147    /// process the message.
148    Claim {
149        owner: AccountOwner,
150        target_id: ChainId,
151        recipient: Recipient,
152        amount: Amount,
153    },
154    /// Creates (or activates) a new chain.
155    /// This will automatically subscribe to the future committees created by `admin_id`.
156    OpenChain(OpenChainConfig),
157    /// Closes the chain.
158    CloseChain,
159    /// Changes the ownership of the chain.
160    ChangeOwnership {
161        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
162        #[debug(skip_if = Vec::is_empty)]
163        super_owners: Vec<AccountOwner>,
164        /// The regular owners, with their weights that determine how often they are round leader.
165        #[debug(skip_if = Vec::is_empty)]
166        owners: Vec<(AccountOwner, u64)>,
167        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
168        multi_leader_rounds: u32,
169        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
170        /// This should only be `true` on chains with restrictive application permissions and an
171        /// application-based mechanism to select block proposers.
172        open_multi_leader_rounds: bool,
173        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
174        timeout_config: TimeoutConfig,
175    },
176    /// Changes the application permissions configuration on this chain.
177    ChangeApplicationPermissions(ApplicationPermissions),
178    /// Publishes a new application module.
179    PublishModule { module_id: ModuleId },
180    /// Publishes a new data blob.
181    PublishDataBlob { blob_hash: CryptoHash },
182    /// Reads a blob and discards the result.
183    // TODO(#2490): Consider removing this.
184    ReadBlob { blob_id: BlobId },
185    /// Creates a new application.
186    CreateApplication {
187        module_id: ModuleId,
188        #[serde(with = "serde_bytes")]
189        #[debug(with = "hex_debug")]
190        parameters: Vec<u8>,
191        #[serde(with = "serde_bytes")]
192        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
193        instantiation_argument: Vec<u8>,
194        #[debug(skip_if = Vec::is_empty)]
195        required_application_ids: Vec<ApplicationId>,
196    },
197    /// Operations that are only allowed on the admin chain.
198    Admin(AdminOperation),
199    /// Processes an event about a new epoch and committee.
200    ProcessNewEpoch(Epoch),
201    /// Processes an event about a removed epoch and committee.
202    ProcessRemovedEpoch(Epoch),
203    /// Updates the event stream trackers.
204    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
205}
206
207/// Operations that are only allowed on the admin chain.
208#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
209pub enum AdminOperation {
210    /// Publishes a new committee as a blob. This can be assigned to an epoch using
211    /// [`AdminOperation::CreateCommittee`] in a later block.
212    PublishCommitteeBlob { blob_hash: CryptoHash },
213    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
214    /// [`SystemOperation::ProcessNewEpoch`].
215    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
216    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
217    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
218    /// re-certified) by a block certified by a recent committee.
219    RemoveCommittee { epoch: Epoch },
220}
221
222/// A system message meant to be executed on a remote chain.
223#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
224pub enum SystemMessage {
225    /// Credits `amount` units of value to the account `target` -- unless the message is
226    /// bouncing, in which case `source` is credited instead.
227    Credit {
228        target: AccountOwner,
229        amount: Amount,
230        source: AccountOwner,
231    },
232    /// Withdraws `amount` units of value from the account and starts a transfer to credit
233    /// the recipient. The message must be properly authenticated. Receiver chains may
234    /// refuse it depending on their configuration.
235    Withdraw {
236        owner: AccountOwner,
237        amount: Amount,
238        recipient: Recipient,
239    },
240    /// Notifies that a new application was created.
241    ApplicationCreated,
242}
243
244/// A query to the system state.
245#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
246pub struct SystemQuery;
247
248/// The response to a system query.
249#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
250pub struct SystemResponse {
251    pub chain_id: ChainId,
252    pub balance: Amount,
253}
254
255/// The recipient of a transfer.
256#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
257pub enum Recipient {
258    /// This is mainly a placeholder for future extensions.
259    Burn,
260    /// Transfers to the balance of the given account.
261    Account(Account),
262}
263
264impl Recipient {
265    /// Returns the default recipient for the given chain (no owner).
266    pub fn chain(chain_id: ChainId) -> Recipient {
267        Recipient::Account(Account::chain(chain_id))
268    }
269}
270
271impl From<ChainId> for Recipient {
272    fn from(chain_id: ChainId) -> Self {
273        Recipient::chain(chain_id)
274    }
275}
276
277impl From<Account> for Recipient {
278    fn from(account: Account) -> Self {
279        Recipient::Account(account)
280    }
281}
282
283/// Optional user message attached to a transfer.
284#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
285pub struct UserData(pub Option<[u8; 32]>);
286
287impl UserData {
288    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
289        // Convert the Option<String> to Option<[u8; 32]>
290        let option_array = match opt_str {
291            Some(s) => {
292                // Convert the String to a Vec<u8>
293                let vec = s.into_bytes();
294                if vec.len() <= 32 {
295                    // Create an array from the Vec<u8>
296                    let mut array = [b' '; 32];
297
298                    // Copy bytes from the vector into the array
299                    let len = vec.len().min(32);
300                    array[..len].copy_from_slice(&vec[..len]);
301
302                    Some(array)
303                } else {
304                    return Err(vec.len());
305                }
306            }
307            None => None,
308        };
309
310        // Return the UserData with the converted Option<[u8; 32]>
311        Ok(UserData(option_array))
312    }
313}
314
315#[derive(Debug)]
316pub struct CreateApplicationResult {
317    pub app_id: ApplicationId,
318    pub txn_tracker: TransactionTracker,
319}
320
321impl<C> SystemExecutionStateView<C>
322where
323    C: Context + Clone + Send + Sync + 'static,
324    C::Extra: ExecutionRuntimeContext,
325{
326    /// Invariant for the states of active chains.
327    pub fn is_active(&self) -> bool {
328        self.description.get().is_some()
329            && self.ownership.get().is_active()
330            && self.current_committee().is_some()
331            && self.admin_id.get().is_some()
332    }
333
334    /// Returns the current committee, if any.
335    pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
336        let epoch = self.epoch.get();
337        let committee = self.committees.get().get(epoch)?;
338        Some((*epoch, committee))
339    }
340
341    /// Returns a map of epochs to serialized_committees.
342    pub fn get_committees(&self) -> BTreeMap<Epoch, Vec<u8>> {
343        self.committees
344            .get()
345            .iter()
346            .map(|(epoch, committee)| {
347                let serialized_committee =
348                    bcs::to_bytes(committee).expect("Serializing a committee should not fail!");
349                (*epoch, serialized_committee)
350            })
351            .collect()
352    }
353
354    async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
355        match self.context().extra().get_event(event_id.clone()).await? {
356            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
357            Some(vec) => Ok(vec),
358        }
359    }
360
361    /// Executes the sender's side of an operation and returns a list of actions to be
362    /// taken.
363    pub async fn execute_operation(
364        &mut self,
365        context: OperationContext,
366        operation: SystemOperation,
367        txn_tracker: &mut TransactionTracker,
368        resource_controller: &mut ResourceController<Option<AccountOwner>>,
369    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
370        use SystemOperation::*;
371        let mut new_application = None;
372        match operation {
373            OpenChain(config) => {
374                let _chain_id = self
375                    .open_chain(
376                        config,
377                        context.chain_id,
378                        context.height,
379                        context.timestamp,
380                        txn_tracker,
381                    )
382                    .await?;
383                #[cfg(with_metrics)]
384                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
385            }
386            ChangeOwnership {
387                super_owners,
388                owners,
389                multi_leader_rounds,
390                open_multi_leader_rounds,
391                timeout_config,
392            } => {
393                self.ownership.set(ChainOwnership {
394                    super_owners: super_owners.into_iter().collect(),
395                    owners: owners.into_iter().collect(),
396                    multi_leader_rounds,
397                    open_multi_leader_rounds,
398                    timeout_config,
399                });
400            }
401            ChangeApplicationPermissions(application_permissions) => {
402                self.application_permissions.set(application_permissions);
403            }
404            CloseChain => self.close_chain().await?,
405            Transfer {
406                owner,
407                amount,
408                recipient,
409            } => {
410                let maybe_message = self
411                    .transfer(context.authenticated_signer, None, owner, recipient, amount)
412                    .await?;
413                txn_tracker.add_outgoing_messages(maybe_message)?;
414            }
415            Claim {
416                owner,
417                target_id,
418                recipient,
419                amount,
420            } => {
421                let message = self
422                    .claim(
423                        context.authenticated_signer,
424                        None,
425                        owner,
426                        target_id,
427                        recipient,
428                        amount,
429                    )
430                    .await?;
431                txn_tracker.add_outgoing_message(message)?;
432            }
433            Admin(admin_operation) => {
434                ensure!(
435                    *self.admin_id.get() == Some(context.chain_id),
436                    ExecutionError::AdminOperationOnNonAdminChain
437                );
438                match admin_operation {
439                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
440                        self.blob_published(
441                            &BlobId::new(blob_hash, BlobType::Committee),
442                            txn_tracker,
443                        )?;
444                    }
445                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
446                        self.check_next_epoch(epoch)?;
447                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
448                        let committee =
449                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
450                        self.blob_used(txn_tracker, blob_id).await?;
451                        self.committees.get_mut().insert(epoch, committee);
452                        self.epoch.set(epoch);
453                        txn_tracker.add_event(
454                            StreamId::system(EPOCH_STREAM_NAME),
455                            epoch.0,
456                            bcs::to_bytes(&blob_hash)?,
457                        );
458                    }
459                    AdminOperation::RemoveCommittee { epoch } => {
460                        ensure!(
461                            self.committees.get_mut().remove(&epoch).is_some(),
462                            ExecutionError::InvalidCommitteeRemoval
463                        );
464                        txn_tracker.add_event(
465                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
466                            epoch.0,
467                            vec![],
468                        );
469                    }
470                }
471            }
472            PublishModule { module_id } => {
473                for blob_id in module_id.bytecode_blob_ids() {
474                    self.blob_published(&blob_id, txn_tracker)?;
475                }
476            }
477            CreateApplication {
478                module_id,
479                parameters,
480                instantiation_argument,
481                required_application_ids,
482            } => {
483                let txn_tracker_moved = mem::take(txn_tracker);
484                let CreateApplicationResult {
485                    app_id,
486                    txn_tracker: txn_tracker_moved,
487                } = self
488                    .create_application(
489                        context.chain_id,
490                        context.height,
491                        module_id,
492                        parameters,
493                        required_application_ids,
494                        txn_tracker_moved,
495                    )
496                    .await?;
497                *txn_tracker = txn_tracker_moved;
498                new_application = Some((app_id, instantiation_argument));
499            }
500            PublishDataBlob { blob_hash } => {
501                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
502            }
503            ReadBlob { blob_id } => {
504                let content = self.read_blob_content(blob_id).await?;
505                if blob_id.blob_type == BlobType::Data {
506                    resource_controller
507                        .with_state(self)
508                        .await?
509                        .track_blob_read(content.bytes().len() as u64)?;
510                }
511                self.blob_used(txn_tracker, blob_id).await?;
512            }
513            ProcessNewEpoch(epoch) => {
514                self.check_next_epoch(epoch)?;
515                let admin_id = self
516                    .admin_id
517                    .get()
518                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
519                let event_id = EventId {
520                    chain_id: admin_id,
521                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
522                    index: epoch.0,
523                };
524                let bytes = match txn_tracker.next_replayed_oracle_response()? {
525                    None => self.get_event(event_id.clone()).await?,
526                    Some(OracleResponse::Event(recorded_event_id, bytes))
527                        if recorded_event_id == event_id =>
528                    {
529                        bytes
530                    }
531                    Some(_) => return Err(ExecutionError::OracleResponseMismatch),
532                };
533                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
534                txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
535                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
536                self.blob_used(txn_tracker, blob_id).await?;
537                self.committees.get_mut().insert(epoch, committee);
538                self.epoch.set(epoch);
539            }
540            ProcessRemovedEpoch(epoch) => {
541                ensure!(
542                    self.committees.get_mut().remove(&epoch).is_some(),
543                    ExecutionError::InvalidCommitteeRemoval
544                );
545                let admin_id = self
546                    .admin_id
547                    .get()
548                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
549                let event_id = EventId {
550                    chain_id: admin_id,
551                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
552                    index: epoch.0,
553                };
554                let bytes = match txn_tracker.next_replayed_oracle_response()? {
555                    None => self.get_event(event_id.clone()).await?,
556                    Some(OracleResponse::Event(recorded_event_id, bytes))
557                        if recorded_event_id == event_id =>
558                    {
559                        bytes
560                    }
561                    Some(_) => return Err(ExecutionError::OracleResponseMismatch),
562                };
563                txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
564            }
565            UpdateStreams(streams) => {
566                for (chain_id, stream_id, next_index) in streams {
567                    let subscriptions = self
568                        .event_subscriptions
569                        .get_mut_or_default(&(chain_id, stream_id.clone()))
570                        .await?;
571                    ensure!(
572                        subscriptions.next_index < next_index,
573                        ExecutionError::OutdatedUpdateStreams
574                    );
575                    for application_id in &subscriptions.applications {
576                        txn_tracker.add_stream_to_process(
577                            *application_id,
578                            chain_id,
579                            stream_id.clone(),
580                            subscriptions.next_index,
581                            next_index,
582                        );
583                    }
584                    subscriptions.next_index = next_index;
585                    let index = next_index
586                        .checked_sub(1)
587                        .ok_or(ArithmeticError::Underflow)?;
588                    let event_id = EventId {
589                        chain_id,
590                        stream_id,
591                        index,
592                    };
593                    ensure!(
594                        self.context()
595                            .extra()
596                            .contains_event(event_id.clone())
597                            .await?,
598                        ExecutionError::EventNotFound(event_id)
599                    );
600                }
601            }
602        }
603
604        Ok(new_application)
605    }
606
607    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
608    /// epoch.
609    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
610        let expected = self.epoch.get().try_add_one()?;
611        ensure!(
612            provided == expected,
613            ExecutionError::InvalidCommitteeEpoch { provided, expected }
614        );
615        Ok(())
616    }
617
618    pub async fn transfer(
619        &mut self,
620        authenticated_signer: Option<AccountOwner>,
621        authenticated_application_id: Option<ApplicationId>,
622        source: AccountOwner,
623        recipient: Recipient,
624        amount: Amount,
625    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
626        if source == AccountOwner::CHAIN {
627            ensure!(
628                authenticated_signer.is_some()
629                    && self
630                        .ownership
631                        .get()
632                        .verify_owner(&authenticated_signer.unwrap()),
633                ExecutionError::UnauthenticatedTransferOwner
634            );
635        } else {
636            ensure!(
637                authenticated_signer == Some(source)
638                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
639                ExecutionError::UnauthenticatedTransferOwner
640            );
641        }
642        ensure!(
643            amount > Amount::ZERO,
644            ExecutionError::IncorrectTransferAmount
645        );
646        self.debit(&source, amount).await?;
647        match recipient {
648            Recipient::Account(account) => {
649                let message = SystemMessage::Credit {
650                    amount,
651                    source,
652                    target: account.owner,
653                };
654                Ok(Some(
655                    OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
656                ))
657            }
658            Recipient::Burn => Ok(None),
659        }
660    }
661
662    pub async fn claim(
663        &self,
664        authenticated_signer: Option<AccountOwner>,
665        authenticated_application_id: Option<ApplicationId>,
666        source: AccountOwner,
667        target_id: ChainId,
668        recipient: Recipient,
669        amount: Amount,
670    ) -> Result<OutgoingMessage, ExecutionError> {
671        ensure!(
672            authenticated_signer == Some(source)
673                || authenticated_application_id.map(AccountOwner::from) == Some(source),
674            ExecutionError::UnauthenticatedClaimOwner
675        );
676        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
677
678        let message = SystemMessage::Withdraw {
679            amount,
680            owner: source,
681            recipient,
682        };
683        Ok(
684            OutgoingMessage::new(target_id, message)
685                .with_authenticated_signer(authenticated_signer),
686        )
687    }
688
689    /// Debits an [`Amount`] of tokens from an account's balance.
690    async fn debit(
691        &mut self,
692        account: &AccountOwner,
693        amount: Amount,
694    ) -> Result<(), ExecutionError> {
695        let balance = if account == &AccountOwner::CHAIN {
696            self.balance.get_mut()
697        } else {
698            self.balances.get_mut(account).await?.ok_or_else(|| {
699                ExecutionError::InsufficientBalance {
700                    balance: Amount::ZERO,
701                    account: *account,
702                }
703            })?
704        };
705
706        balance
707            .try_sub_assign(amount)
708            .map_err(|_| ExecutionError::InsufficientBalance {
709                balance: *balance,
710                account: *account,
711            })?;
712
713        if account != &AccountOwner::CHAIN && balance.is_zero() {
714            self.balances.remove(account)?;
715        }
716
717        Ok(())
718    }
719
720    /// Executes a cross-chain message that represents the recipient's side of an operation.
721    pub async fn execute_message(
722        &mut self,
723        context: MessageContext,
724        message: SystemMessage,
725    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
726        let mut outcome = Vec::new();
727        use SystemMessage::*;
728        match message {
729            Credit {
730                amount,
731                source,
732                target,
733            } => {
734                let receiver = if context.is_bouncing { source } else { target };
735                if receiver == AccountOwner::CHAIN {
736                    let new_balance = self.balance.get().saturating_add(amount);
737                    self.balance.set(new_balance);
738                } else {
739                    let balance = self.balances.get_mut_or_default(&receiver).await?;
740                    *balance = balance.saturating_add(amount);
741                }
742            }
743            Withdraw {
744                amount,
745                owner,
746                recipient,
747            } => {
748                self.debit(&owner, amount).await?;
749                match recipient {
750                    Recipient::Account(account) => {
751                        let message = SystemMessage::Credit {
752                            amount,
753                            source: owner,
754                            target: account.owner,
755                        };
756                        outcome.push(
757                            OutgoingMessage::new(account.chain_id, message)
758                                .with_kind(MessageKind::Tracked),
759                        );
760                    }
761                    Recipient::Burn => (),
762                }
763            }
764            // This message is only a placeholder: Its ID is part of the application ID.
765            ApplicationCreated => {}
766        }
767        Ok(outcome)
768    }
769
770    /// Initializes the system application state on a newly opened chain.
771    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
772    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
773        if self.description.get().is_some() {
774            // already initialized
775            return Ok(true);
776        }
777        let description_blob = self
778            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
779            .await?;
780        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
781        let InitialChainConfig {
782            ownership,
783            epoch,
784            committees,
785            balance,
786            application_permissions,
787        } = description.config().clone();
788        self.timestamp.set(description.timestamp());
789        self.description.set(Some(description));
790        self.epoch.set(epoch);
791        let committees = committees
792            .into_iter()
793            .map(|(epoch, serialized_committee)| {
794                let committee = bcs::from_bytes(&serialized_committee)
795                    .expect("Deserializing a committee shouldn't fail");
796                (epoch, committee)
797            })
798            .collect();
799        self.committees.set(committees);
800        // If `admin_id` is `None`, this chain is its own admin chain.
801        let admin_id = self
802            .context()
803            .extra()
804            .get_network_description()
805            .await?
806            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
807            .admin_chain_id;
808        self.admin_id.set(Some(admin_id));
809        self.ownership.set(ownership);
810        self.balance.set(balance);
811        self.application_permissions.set(application_permissions);
812        Ok(false)
813    }
814
815    pub async fn handle_query(
816        &mut self,
817        context: QueryContext,
818        _query: SystemQuery,
819    ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
820        let response = SystemResponse {
821            chain_id: context.chain_id,
822            balance: *self.balance.get(),
823        };
824        Ok(QueryOutcome {
825            response,
826            operations: vec![],
827        })
828    }
829
830    /// Returns the messages to open a new chain, and subtracts the new chain's balance
831    /// from this chain's.
832    pub async fn open_chain(
833        &mut self,
834        config: OpenChainConfig,
835        parent: ChainId,
836        block_height: BlockHeight,
837        timestamp: Timestamp,
838        txn_tracker: &mut TransactionTracker,
839    ) -> Result<ChainId, ExecutionError> {
840        let chain_index = txn_tracker.next_chain_index();
841        let chain_origin = ChainOrigin::Child {
842            parent,
843            block_height,
844            chain_index,
845        };
846        let committees = self.get_committees();
847        let init_chain_config = config.init_chain_config(*self.epoch.get(), committees);
848        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
849        let child_id = chain_description.id();
850        self.debit(&AccountOwner::CHAIN, config.balance).await?;
851        let blob = Blob::new_chain_description(&chain_description);
852        txn_tracker.add_created_blob(blob);
853        Ok(child_id)
854    }
855
856    pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
857        self.closed.set(true);
858        Ok(())
859    }
860
861    pub async fn create_application(
862        &mut self,
863        chain_id: ChainId,
864        block_height: BlockHeight,
865        module_id: ModuleId,
866        parameters: Vec<u8>,
867        required_application_ids: Vec<ApplicationId>,
868        mut txn_tracker: TransactionTracker,
869    ) -> Result<CreateApplicationResult, ExecutionError> {
870        let application_index = txn_tracker.next_application_index();
871
872        let blob_ids = self.check_bytecode_blobs(&module_id).await?;
873        // We only remember to register the blobs that aren't recorded in `used_blobs`
874        // already.
875        for blob_id in blob_ids {
876            self.blob_used(&mut txn_tracker, blob_id).await?;
877        }
878
879        let application_description = ApplicationDescription {
880            module_id,
881            creator_chain_id: chain_id,
882            block_height,
883            application_index,
884            parameters,
885            required_application_ids,
886        };
887        self.check_required_applications(&application_description, &mut txn_tracker)
888            .await?;
889
890        let blob = Blob::new_application_description(&application_description);
891        self.used_blobs.insert(&blob.id())?;
892        txn_tracker.add_created_blob(blob);
893
894        Ok(CreateApplicationResult {
895            app_id: ApplicationId::from(&application_description),
896            txn_tracker,
897        })
898    }
899
900    async fn check_required_applications(
901        &mut self,
902        application_description: &ApplicationDescription,
903        txn_tracker: &mut TransactionTracker,
904    ) -> Result<(), ExecutionError> {
905        // Make sure that referenced applications IDs have been registered.
906        for required_id in &application_description.required_application_ids {
907            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
908        }
909        Ok(())
910    }
911
912    /// Retrieves an application's description.
913    pub async fn describe_application(
914        &mut self,
915        id: ApplicationId,
916        txn_tracker: &mut TransactionTracker,
917    ) -> Result<ApplicationDescription, ExecutionError> {
918        let blob_id = id.description_blob_id();
919        let blob_content = match txn_tracker.created_blobs().get(&blob_id) {
920            Some(blob) => blob.content().clone(),
921            None => self.read_blob_content(blob_id).await?,
922        };
923        self.blob_used(txn_tracker, blob_id).await?;
924        let description: ApplicationDescription = bcs::from_bytes(blob_content.bytes())?;
925
926        let blob_ids = self.check_bytecode_blobs(&description.module_id).await?;
927        // We only remember to register the blobs that aren't recorded in `used_blobs`
928        // already.
929        for blob_id in blob_ids {
930            self.blob_used(txn_tracker, blob_id).await?;
931        }
932
933        self.check_required_applications(&description, txn_tracker)
934            .await?;
935
936        Ok(description)
937    }
938
939    /// Retrieves the recursive dependencies of applications and applies a topological sort.
940    pub async fn find_dependencies(
941        &mut self,
942        mut stack: Vec<ApplicationId>,
943        txn_tracker: &mut TransactionTracker,
944    ) -> Result<Vec<ApplicationId>, ExecutionError> {
945        // What we return at the end.
946        let mut result = Vec::new();
947        // The entries already inserted in `result`.
948        let mut sorted = HashSet::new();
949        // The entries for which dependencies have already been pushed once to the stack.
950        let mut seen = HashSet::new();
951
952        while let Some(id) = stack.pop() {
953            if sorted.contains(&id) {
954                continue;
955            }
956            if seen.contains(&id) {
957                // Second time we see this entry. It was last pushed just before its
958                // dependencies -- which are now fully sorted.
959                sorted.insert(id);
960                result.push(id);
961                continue;
962            }
963            // First time we see this entry:
964            // 1. Mark it so that its dependencies are no longer pushed to the stack.
965            seen.insert(id);
966            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
967            stack.push(id);
968            let app = self.describe_application(id, txn_tracker).await?;
969            for child in app.required_application_ids.iter().rev() {
970                if !seen.contains(child) {
971                    stack.push(*child);
972                }
973            }
974        }
975        Ok(result)
976    }
977
978    /// Records a blob that is used in this block. If this is the first use on this chain, creates
979    /// an oracle response for it.
980    pub(crate) async fn blob_used(
981        &mut self,
982        txn_tracker: &mut TransactionTracker,
983        blob_id: BlobId,
984    ) -> Result<bool, ExecutionError> {
985        if self.used_blobs.contains(&blob_id).await? {
986            return Ok(false); // Nothing to do.
987        }
988        self.used_blobs.insert(&blob_id)?;
989        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
990        Ok(true)
991    }
992
993    /// Records a blob that is published in this block. This does not create an oracle entry, and
994    /// the blob can be used without using an oracle in the future on this chain.
995    fn blob_published(
996        &mut self,
997        blob_id: &BlobId,
998        txn_tracker: &mut TransactionTracker,
999    ) -> Result<(), ExecutionError> {
1000        self.used_blobs.insert(blob_id)?;
1001        txn_tracker.add_published_blob(*blob_id);
1002        Ok(())
1003    }
1004
1005    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1006        match self.context().extra().get_blob(blob_id).await {
1007            Ok(Some(blob)) => Ok(blob.into()),
1008            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1009            Err(error) => Err(error.into()),
1010        }
1011    }
1012
1013    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1014        if self.context().extra().contains_blob(blob_id).await? {
1015            Ok(())
1016        } else {
1017            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1018        }
1019    }
1020
1021    async fn check_bytecode_blobs(
1022        &mut self,
1023        module_id: &ModuleId,
1024    ) -> Result<Vec<BlobId>, ExecutionError> {
1025        let blob_ids = module_id.bytecode_blob_ids();
1026
1027        let mut missing_blobs = Vec::new();
1028        for blob_id in &blob_ids {
1029            if !self.context().extra().contains_blob(*blob_id).await? {
1030                missing_blobs.push(*blob_id);
1031            }
1032        }
1033        ensure!(
1034            missing_blobs.is_empty(),
1035            ExecutionError::BlobsNotFound(missing_blobs)
1036        );
1037
1038        Ok(blob_ids)
1039    }
1040}