linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::collections::{BTreeMap, BTreeSet, HashSet};
10
11use allocative::Allocative;
12use custom_debug_derive::Debug;
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{
16        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
17        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
18    },
19    ensure, hex_debug,
20    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
21    ownership::{ChainOwnership, TimeoutConfig},
22};
23use linera_views::{
24    context::Context,
25    map_view::MapView,
26    register_view::RegisterView,
27    set_view::SetView,
28    views::{ClonableView, ReplaceContext, View},
29};
30use serde::{Deserialize, Serialize};
31
32#[cfg(test)]
33use crate::test_utils::SystemExecutionState;
34use crate::{
35    committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
36    ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
37    OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
38};
39
40/// The event stream name for new epochs and committees.
41pub static EPOCH_STREAM_NAME: &[u8] = &[0];
42/// The event stream name for removed epochs.
43pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
44
45/// The data stored in an epoch creation event.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct EpochEventData {
48    /// The hash of the committee blob for this epoch.
49    pub blob_hash: CryptoHash,
50    /// The timestamp when the epoch was created on the admin chain.
51    pub timestamp: Timestamp,
52}
53
54/// The number of times the [`SystemOperation::OpenChain`] was executed.
55#[cfg(with_metrics)]
56mod metrics {
57    use std::sync::LazyLock;
58
59    use linera_base::prometheus_util::register_int_counter_vec;
60    use prometheus::IntCounterVec;
61
62    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
63        register_int_counter_vec(
64            "open_chain_count",
65            "The number of times the `OpenChain` operation was executed",
66            &[],
67        )
68    });
69}
70
71/// A view accessing the execution state of the system of a chain.
72#[derive(Debug, ClonableView, View, Allocative)]
73#[allocative(bound = "C")]
74pub struct SystemExecutionStateView<C> {
75    /// How the chain was created. May be unknown for inactive chains.
76    pub description: RegisterView<C, Option<ChainDescription>>,
77    /// The number identifying the current configuration.
78    pub epoch: RegisterView<C, Epoch>,
79    /// The admin of the chain.
80    pub admin_id: RegisterView<C, Option<ChainId>>,
81    /// The committees that we trust, indexed by epoch number.
82    // Not using a `MapView` because the set active of committees is supposed to be
83    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
84    // (e.g. the `OpenChain` operation).
85    pub committees: RegisterView<C, BTreeMap<Epoch, Committee>>,
86    /// Ownership of the chain.
87    pub ownership: RegisterView<C, ChainOwnership>,
88    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
89    pub balance: RegisterView<C, Amount>,
90    /// Balances attributed to a given owner.
91    pub balances: MapView<C, AccountOwner, Amount>,
92    /// The timestamp of the most recent block.
93    pub timestamp: RegisterView<C, Timestamp>,
94    /// Whether this chain has been closed.
95    pub closed: RegisterView<C, bool>,
96    /// Permissions for applications on this chain.
97    pub application_permissions: RegisterView<C, ApplicationPermissions>,
98    /// Blobs that have been used or published on this chain.
99    pub used_blobs: SetView<C, BlobId>,
100    /// The event stream subscriptions of applications on this chain.
101    pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
102    /// The number of events in the streams that this chain is writing to.
103    pub stream_event_counts: MapView<C, StreamId, u32>,
104}
105
106impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
107    type Target = SystemExecutionStateView<C2>;
108
109    async fn with_context(
110        &mut self,
111        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
112    ) -> Self::Target {
113        SystemExecutionStateView {
114            description: self.description.with_context(ctx.clone()).await,
115            epoch: self.epoch.with_context(ctx.clone()).await,
116            admin_id: self.admin_id.with_context(ctx.clone()).await,
117            committees: self.committees.with_context(ctx.clone()).await,
118            ownership: self.ownership.with_context(ctx.clone()).await,
119            balance: self.balance.with_context(ctx.clone()).await,
120            balances: self.balances.with_context(ctx.clone()).await,
121            timestamp: self.timestamp.with_context(ctx.clone()).await,
122            closed: self.closed.with_context(ctx.clone()).await,
123            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
124            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
125            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
126            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
127        }
128    }
129}
130
131/// The applications subscribing to a particular stream, and the next event index.
132#[derive(Debug, Default, Clone, Serialize, Deserialize, Allocative)]
133pub struct EventSubscriptions {
134    /// The next event index, i.e. the total number of events in this stream that have already
135    /// been processed by this chain.
136    pub next_index: u32,
137    /// The applications that are subscribed to this stream.
138    pub applications: BTreeSet<ApplicationId>,
139}
140
141/// The initial configuration for a new chain.
142#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
143pub struct OpenChainConfig {
144    /// The ownership configuration of the new chain.
145    pub ownership: ChainOwnership,
146    /// The initial chain balance.
147    pub balance: Amount,
148    /// The initial application permissions.
149    pub application_permissions: ApplicationPermissions,
150}
151
152impl OpenChainConfig {
153    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
154    /// parameters.
155    pub fn init_chain_config(
156        &self,
157        epoch: Epoch,
158        min_active_epoch: Epoch,
159        max_active_epoch: Epoch,
160    ) -> InitialChainConfig {
161        InitialChainConfig {
162            application_permissions: self.application_permissions.clone(),
163            balance: self.balance,
164            epoch,
165            min_active_epoch,
166            max_active_epoch,
167            ownership: self.ownership.clone(),
168        }
169    }
170}
171
172/// A system operation.
173#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
174pub enum SystemOperation {
175    /// Transfers `amount` units of value from the given owner's account to the recipient.
176    /// If no owner is given, try to take the units out of the unattributed account.
177    Transfer {
178        owner: AccountOwner,
179        recipient: Account,
180        amount: Amount,
181    },
182    /// Claims `amount` units of value from the given owner's account in the remote
183    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
184    /// process the message.
185    Claim {
186        owner: AccountOwner,
187        target_id: ChainId,
188        recipient: Account,
189        amount: Amount,
190    },
191    /// Creates (or activates) a new chain.
192    /// This will automatically subscribe to the future committees created by `admin_id`.
193    OpenChain(OpenChainConfig),
194    /// Closes the chain.
195    CloseChain,
196    /// Changes the ownership of the chain.
197    ChangeOwnership {
198        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
199        #[debug(skip_if = Vec::is_empty)]
200        super_owners: Vec<AccountOwner>,
201        /// The regular owners, with their weights that determine how often they are round leader.
202        #[debug(skip_if = Vec::is_empty)]
203        owners: Vec<(AccountOwner, u64)>,
204        /// The leader of the first single-leader round. If not set, this is random like other rounds.
205        #[debug(skip_if = Option::is_none)]
206        first_leader: Option<AccountOwner>,
207        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
208        multi_leader_rounds: u32,
209        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
210        /// This should only be `true` on chains with restrictive application permissions and an
211        /// application-based mechanism to select block proposers.
212        open_multi_leader_rounds: bool,
213        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
214        timeout_config: TimeoutConfig,
215    },
216    /// Changes the application permissions configuration on this chain.
217    ChangeApplicationPermissions(ApplicationPermissions),
218    /// Publishes a new application module.
219    PublishModule { module_id: ModuleId },
220    /// Publishes a new data blob.
221    PublishDataBlob { blob_hash: CryptoHash },
222    /// Verifies that the given blob exists. Otherwise the block fails.
223    VerifyBlob { blob_id: BlobId },
224    /// Creates a new application.
225    CreateApplication {
226        module_id: ModuleId,
227        #[serde(with = "serde_bytes")]
228        #[debug(with = "hex_debug")]
229        parameters: Vec<u8>,
230        #[serde(with = "serde_bytes")]
231        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
232        instantiation_argument: Vec<u8>,
233        #[debug(skip_if = Vec::is_empty)]
234        required_application_ids: Vec<ApplicationId>,
235    },
236    /// Operations that are only allowed on the admin chain.
237    Admin(AdminOperation),
238    /// Processes an event about a new epoch and committee.
239    ProcessNewEpoch(Epoch),
240    /// Processes an event about a removed epoch and committee.
241    ProcessRemovedEpoch(Epoch),
242    /// Updates the event stream trackers.
243    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
244}
245
246/// Operations that are only allowed on the admin chain.
247#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
248pub enum AdminOperation {
249    /// Publishes a new committee as a blob. This can be assigned to an epoch using
250    /// [`AdminOperation::CreateCommittee`] in a later block.
251    PublishCommitteeBlob { blob_hash: CryptoHash },
252    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
253    /// [`SystemOperation::ProcessNewEpoch`].
254    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
255    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
256    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
257    /// re-certified) by a block certified by a recent committee.
258    RemoveCommittee { epoch: Epoch },
259}
260
261/// A system message meant to be executed on a remote chain.
262#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
263pub enum SystemMessage {
264    /// Credits `amount` units of value to the account `target` -- unless the message is
265    /// bouncing, in which case `source` is credited instead.
266    Credit {
267        target: AccountOwner,
268        amount: Amount,
269        source: AccountOwner,
270    },
271    /// Withdraws `amount` units of value from the account and starts a transfer to credit
272    /// the recipient. The message must be properly authenticated. Receiver chains may
273    /// refuse it depending on their configuration.
274    Withdraw {
275        owner: AccountOwner,
276        amount: Amount,
277        recipient: Account,
278    },
279}
280
281/// A query to the system state.
282#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
283pub struct SystemQuery;
284
285/// The response to a system query.
286#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
287pub struct SystemResponse {
288    pub chain_id: ChainId,
289    pub balance: Amount,
290}
291
292/// Optional user message attached to a transfer.
293#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
294pub struct UserData(pub Option<[u8; 32]>);
295
296impl UserData {
297    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
298        // Convert the Option<String> to Option<[u8; 32]>
299        let option_array = match opt_str {
300            Some(s) => {
301                // Convert the String to a Vec<u8>
302                let vec = s.into_bytes();
303                if vec.len() <= 32 {
304                    // Create an array from the Vec<u8>
305                    let mut array = [b' '; 32];
306
307                    // Copy bytes from the vector into the array
308                    let len = vec.len().min(32);
309                    array[..len].copy_from_slice(&vec[..len]);
310
311                    Some(array)
312                } else {
313                    return Err(vec.len());
314                }
315            }
316            None => None,
317        };
318
319        // Return the UserData with the converted Option<[u8; 32]>
320        Ok(UserData(option_array))
321    }
322}
323
324#[derive(Debug)]
325pub struct CreateApplicationResult {
326    pub app_id: ApplicationId,
327}
328
329impl<C> SystemExecutionStateView<C>
330where
331    C: Context + Clone + 'static,
332    C::Extra: ExecutionRuntimeContext,
333{
334    /// Invariant for the states of active chains.
335    pub fn is_active(&self) -> bool {
336        self.description.get().is_some()
337            && self.ownership.get().is_active()
338            && self.current_committee().is_some()
339            && self.admin_id.get().is_some()
340    }
341
342    /// Returns the current committee, if any.
343    pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
344        let epoch = self.epoch.get();
345        let committee = self.committees.get().get(epoch)?;
346        Some((*epoch, committee))
347    }
348
349    async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
350        match self.context().extra().get_event(event_id.clone()).await? {
351            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
352            Some(vec) => Ok(vec),
353        }
354    }
355
356    /// Executes the sender's side of an operation and returns a list of actions to be
357    /// taken.
358    pub async fn execute_operation(
359        &mut self,
360        context: OperationContext,
361        operation: SystemOperation,
362        txn_tracker: &mut TransactionTracker,
363        resource_controller: &mut ResourceController<Option<AccountOwner>>,
364    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
365        use SystemOperation::*;
366        let mut new_application = None;
367        match operation {
368            OpenChain(config) => {
369                let _chain_id = self
370                    .open_chain(
371                        config,
372                        context.chain_id,
373                        context.height,
374                        context.timestamp,
375                        txn_tracker,
376                    )
377                    .await?;
378                #[cfg(with_metrics)]
379                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
380            }
381            ChangeOwnership {
382                super_owners,
383                owners,
384                first_leader,
385                multi_leader_rounds,
386                open_multi_leader_rounds,
387                timeout_config,
388            } => {
389                self.ownership.set(ChainOwnership {
390                    super_owners: super_owners.into_iter().collect(),
391                    owners: owners.into_iter().collect(),
392                    first_leader,
393                    multi_leader_rounds,
394                    open_multi_leader_rounds,
395                    timeout_config,
396                });
397            }
398            ChangeApplicationPermissions(application_permissions) => {
399                self.application_permissions.set(application_permissions);
400            }
401            CloseChain => self.close_chain(),
402            Transfer {
403                owner,
404                amount,
405                recipient,
406            } => {
407                let maybe_message = self
408                    .transfer(context.authenticated_owner, None, owner, recipient, amount)
409                    .await?;
410                txn_tracker.add_outgoing_messages(maybe_message);
411            }
412            Claim {
413                owner,
414                target_id,
415                recipient,
416                amount,
417            } => {
418                let maybe_message = self
419                    .claim(
420                        context.authenticated_owner,
421                        None,
422                        owner,
423                        target_id,
424                        recipient,
425                        amount,
426                    )
427                    .await?;
428                txn_tracker.add_outgoing_messages(maybe_message);
429            }
430            Admin(admin_operation) => {
431                ensure!(
432                    *self.admin_id.get() == Some(context.chain_id),
433                    ExecutionError::AdminOperationOnNonAdminChain
434                );
435                match admin_operation {
436                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
437                        self.blob_published(
438                            &BlobId::new(blob_hash, BlobType::Committee),
439                            txn_tracker,
440                        )?;
441                    }
442                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
443                        self.check_next_epoch(epoch)?;
444                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
445                        let committee =
446                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
447                        self.blob_used(txn_tracker, blob_id).await?;
448                        self.committees.get_mut().insert(epoch, committee);
449                        self.epoch.set(epoch);
450                        let event_data = EpochEventData {
451                            blob_hash,
452                            timestamp: context.timestamp,
453                        };
454                        txn_tracker.add_event(
455                            StreamId::system(EPOCH_STREAM_NAME),
456                            epoch.0,
457                            bcs::to_bytes(&event_data)?,
458                        );
459                    }
460                    AdminOperation::RemoveCommittee { epoch } => {
461                        ensure!(
462                            self.committees.get_mut().remove(&epoch).is_some(),
463                            ExecutionError::InvalidCommitteeRemoval
464                        );
465                        txn_tracker.add_event(
466                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
467                            epoch.0,
468                            vec![],
469                        );
470                    }
471                }
472            }
473            PublishModule { module_id } => {
474                for blob_id in module_id.bytecode_blob_ids() {
475                    self.blob_published(&blob_id, txn_tracker)?;
476                }
477            }
478            CreateApplication {
479                module_id,
480                parameters,
481                instantiation_argument,
482                required_application_ids,
483            } => {
484                let CreateApplicationResult { app_id } = self
485                    .create_application(
486                        context.chain_id,
487                        context.height,
488                        module_id,
489                        parameters,
490                        required_application_ids,
491                        txn_tracker,
492                    )
493                    .await?;
494                new_application = Some((app_id, instantiation_argument));
495            }
496            PublishDataBlob { blob_hash } => {
497                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
498            }
499            VerifyBlob { blob_id } => {
500                self.assert_blob_exists(blob_id).await?;
501                resource_controller
502                    .with_state(self)
503                    .await?
504                    .track_blob_read(0)?;
505                self.blob_used(txn_tracker, blob_id).await?;
506            }
507            ProcessNewEpoch(epoch) => {
508                self.check_next_epoch(epoch)?;
509                let admin_id = self
510                    .admin_id
511                    .get()
512                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
513                let event_id = EventId {
514                    chain_id: admin_id,
515                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
516                    index: epoch.0,
517                };
518                let bytes = txn_tracker
519                    .oracle(|| async {
520                        let bytes = self.get_event(event_id.clone()).await?;
521                        Ok(OracleResponse::Event(event_id.clone(), bytes))
522                    })
523                    .await?
524                    .to_event(&event_id)?;
525                let event_data: EpochEventData = bcs::from_bytes(&bytes)?;
526                let blob_id = BlobId::new(event_data.blob_hash, BlobType::Committee);
527                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
528                self.blob_used(txn_tracker, blob_id).await?;
529                self.committees.get_mut().insert(epoch, committee);
530                self.epoch.set(epoch);
531            }
532            ProcessRemovedEpoch(epoch) => {
533                ensure!(
534                    self.committees.get_mut().remove(&epoch).is_some(),
535                    ExecutionError::InvalidCommitteeRemoval
536                );
537                let admin_id = self
538                    .admin_id
539                    .get()
540                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
541                let event_id = EventId {
542                    chain_id: admin_id,
543                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
544                    index: epoch.0,
545                };
546                txn_tracker
547                    .oracle(|| async {
548                        let bytes = self.get_event(event_id.clone()).await?;
549                        Ok(OracleResponse::Event(event_id, bytes))
550                    })
551                    .await?;
552            }
553            UpdateStreams(streams) => {
554                let mut missing_events = Vec::new();
555                for (chain_id, stream_id, next_index) in streams {
556                    let subscriptions = self
557                        .event_subscriptions
558                        .get_mut_or_default(&(chain_id, stream_id.clone()))
559                        .await?;
560                    ensure!(
561                        subscriptions.next_index < next_index,
562                        ExecutionError::OutdatedUpdateStreams
563                    );
564                    for application_id in &subscriptions.applications {
565                        txn_tracker.add_stream_to_process(
566                            *application_id,
567                            chain_id,
568                            stream_id.clone(),
569                            subscriptions.next_index,
570                            next_index,
571                        );
572                    }
573                    subscriptions.next_index = next_index;
574                    let index = next_index
575                        .checked_sub(1)
576                        .ok_or(ArithmeticError::Underflow)?;
577                    let event_id = EventId {
578                        chain_id,
579                        stream_id,
580                        index,
581                    };
582                    let context = self.context();
583                    let extra = context.extra();
584                    txn_tracker
585                        .oracle(|| async {
586                            if !extra.contains_event(event_id.clone()).await? {
587                                missing_events.push(event_id.clone());
588                            }
589                            Ok(OracleResponse::EventExists(event_id))
590                        })
591                        .await?;
592                }
593                ensure!(
594                    missing_events.is_empty(),
595                    ExecutionError::EventsNotFound(missing_events)
596                );
597            }
598        }
599
600        Ok(new_application)
601    }
602
603    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
604    /// epoch.
605    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
606        let expected = self.epoch.get().try_add_one()?;
607        ensure!(
608            provided == expected,
609            ExecutionError::InvalidCommitteeEpoch { provided, expected }
610        );
611        Ok(())
612    }
613
614    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
615        if owner == &AccountOwner::CHAIN {
616            let new_balance = self.balance.get().saturating_add(amount);
617            self.balance.set(new_balance);
618        } else {
619            let balance = self.balances.get_mut_or_default(owner).await?;
620            *balance = balance.saturating_add(amount);
621        }
622        Ok(())
623    }
624
625    async fn credit_or_send_message(
626        &mut self,
627        source: AccountOwner,
628        recipient: Account,
629        amount: Amount,
630    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
631        let source_chain_id = self.context().extra().chain_id();
632        if recipient.chain_id == source_chain_id {
633            // Handle same-chain transfer locally.
634            let target = recipient.owner;
635            self.credit(&target, amount).await?;
636            Ok(None)
637        } else {
638            // Handle cross-chain transfer with message.
639            let message = SystemMessage::Credit {
640                amount,
641                source,
642                target: recipient.owner,
643            };
644            Ok(Some(
645                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
646            ))
647        }
648    }
649
650    pub async fn transfer(
651        &mut self,
652        authenticated_owner: Option<AccountOwner>,
653        authenticated_application_id: Option<ApplicationId>,
654        source: AccountOwner,
655        recipient: Account,
656        amount: Amount,
657    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
658        if source == AccountOwner::CHAIN {
659            ensure!(
660                authenticated_owner.is_some()
661                    && self.ownership.get().is_owner(&authenticated_owner.unwrap()),
662                ExecutionError::UnauthenticatedTransferOwner
663            );
664        } else {
665            ensure!(
666                authenticated_owner == Some(source)
667                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
668                ExecutionError::UnauthenticatedTransferOwner
669            );
670        }
671        ensure!(
672            amount > Amount::ZERO,
673            ExecutionError::IncorrectTransferAmount
674        );
675        self.debit(&source, amount).await?;
676        self.credit_or_send_message(source, recipient, amount).await
677    }
678
679    pub async fn claim(
680        &mut self,
681        authenticated_owner: Option<AccountOwner>,
682        authenticated_application_id: Option<ApplicationId>,
683        source: AccountOwner,
684        target_id: ChainId,
685        recipient: Account,
686        amount: Amount,
687    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
688        ensure!(
689            authenticated_owner == Some(source)
690                || authenticated_application_id.map(AccountOwner::from) == Some(source),
691            ExecutionError::UnauthenticatedClaimOwner
692        );
693        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
694
695        let current_chain_id = self.context().extra().chain_id();
696        if target_id == current_chain_id {
697            // Handle same-chain claim locally by processing the withdraw operation directly
698            self.debit(&source, amount).await?;
699            self.credit_or_send_message(source, recipient, amount).await
700        } else {
701            // Handle cross-chain claim with Withdraw message
702            let message = SystemMessage::Withdraw {
703                amount,
704                owner: source,
705                recipient,
706            };
707            Ok(Some(
708                OutgoingMessage::new(target_id, message)
709                    .with_authenticated_owner(authenticated_owner),
710            ))
711        }
712    }
713
714    /// Debits an [`Amount`] of tokens from an account's balance.
715    async fn debit(
716        &mut self,
717        account: &AccountOwner,
718        amount: Amount,
719    ) -> Result<(), ExecutionError> {
720        let balance = if account == &AccountOwner::CHAIN {
721            self.balance.get_mut()
722        } else {
723            self.balances.get_mut(account).await?.ok_or_else(|| {
724                ExecutionError::InsufficientBalance {
725                    balance: Amount::ZERO,
726                    account: *account,
727                }
728            })?
729        };
730
731        balance
732            .try_sub_assign(amount)
733            .map_err(|_| ExecutionError::InsufficientBalance {
734                balance: *balance,
735                account: *account,
736            })?;
737
738        if account != &AccountOwner::CHAIN && balance.is_zero() {
739            self.balances.remove(account)?;
740        }
741
742        Ok(())
743    }
744
745    /// Executes a cross-chain message that represents the recipient's side of an operation.
746    pub async fn execute_message(
747        &mut self,
748        context: MessageContext,
749        message: SystemMessage,
750    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
751        let mut outcome = Vec::new();
752        use SystemMessage::*;
753        match message {
754            Credit {
755                amount,
756                source,
757                target,
758            } => {
759                let receiver = if context.is_bouncing { source } else { target };
760                self.credit(&receiver, amount).await?;
761            }
762            Withdraw {
763                amount,
764                owner,
765                recipient,
766            } => {
767                self.debit(&owner, amount).await?;
768                if let Some(message) = self
769                    .credit_or_send_message(owner, recipient, amount)
770                    .await?
771                {
772                    outcome.push(message);
773                }
774            }
775        }
776        Ok(outcome)
777    }
778
779    /// Initializes the system application state on a newly opened chain.
780    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
781    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
782        if self.description.get().is_some() {
783            // already initialized
784            return Ok(true);
785        }
786        let description_blob = self
787            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
788            .await?;
789        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
790        let InitialChainConfig {
791            ownership,
792            epoch,
793            balance,
794            min_active_epoch,
795            max_active_epoch,
796            application_permissions,
797        } = description.config().clone();
798        self.timestamp.set(description.timestamp());
799        self.description.set(Some(description));
800        self.epoch.set(epoch);
801
802        let committees = self
803            .context()
804            .extra()
805            .get_committees(min_active_epoch..=max_active_epoch)
806            .await?;
807        let admin_chain_id = self
808            .context()
809            .extra()
810            .get_network_description()
811            .await?
812            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
813            .admin_chain_id;
814
815        self.committees.set(committees);
816        self.admin_id.set(Some(admin_chain_id));
817        self.ownership.set(ownership);
818        self.balance.set(balance);
819        self.application_permissions.set(application_permissions);
820        Ok(false)
821    }
822
823    pub fn handle_query(
824        &mut self,
825        context: QueryContext,
826        _query: SystemQuery,
827    ) -> QueryOutcome<SystemResponse> {
828        let response = SystemResponse {
829            chain_id: context.chain_id,
830            balance: *self.balance.get(),
831        };
832        QueryOutcome {
833            response,
834            operations: vec![],
835        }
836    }
837
838    /// Returns the messages to open a new chain, and subtracts the new chain's balance
839    /// from this chain's.
840    pub async fn open_chain(
841        &mut self,
842        config: OpenChainConfig,
843        parent: ChainId,
844        block_height: BlockHeight,
845        timestamp: Timestamp,
846        txn_tracker: &mut TransactionTracker,
847    ) -> Result<ChainId, ExecutionError> {
848        let chain_index = txn_tracker.next_chain_index();
849        let chain_origin = ChainOrigin::Child {
850            parent,
851            block_height,
852            chain_index,
853        };
854        let init_chain_config = config.init_chain_config(
855            *self.epoch.get(),
856            self.committees
857                .get()
858                .keys()
859                .min()
860                .copied()
861                .unwrap_or(Epoch::ZERO),
862            self.committees
863                .get()
864                .keys()
865                .max()
866                .copied()
867                .unwrap_or(Epoch::ZERO),
868        );
869        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
870        let child_id = chain_description.id();
871        self.debit(&AccountOwner::CHAIN, config.balance).await?;
872        let blob = Blob::new_chain_description(&chain_description);
873        txn_tracker.add_created_blob(blob);
874        Ok(child_id)
875    }
876
877    pub fn close_chain(&mut self) {
878        self.closed.set(true);
879    }
880
881    pub async fn create_application(
882        &mut self,
883        chain_id: ChainId,
884        block_height: BlockHeight,
885        module_id: ModuleId,
886        parameters: Vec<u8>,
887        required_application_ids: Vec<ApplicationId>,
888        txn_tracker: &mut TransactionTracker,
889    ) -> Result<CreateApplicationResult, ExecutionError> {
890        let application_index = txn_tracker.next_application_index();
891
892        let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
893        // We only remember to register the blobs that aren't recorded in `used_blobs`
894        // already.
895        for blob_id in blob_ids {
896            self.blob_used(txn_tracker, blob_id).await?;
897        }
898
899        let application_description = ApplicationDescription {
900            module_id,
901            creator_chain_id: chain_id,
902            block_height,
903            application_index,
904            parameters,
905            required_application_ids,
906        };
907        self.check_required_applications(&application_description, txn_tracker)
908            .await?;
909
910        let blob = Blob::new_application_description(&application_description);
911        self.used_blobs.insert(&blob.id())?;
912        txn_tracker.add_created_blob(blob);
913
914        Ok(CreateApplicationResult {
915            app_id: ApplicationId::from(&application_description),
916        })
917    }
918
919    async fn check_required_applications(
920        &mut self,
921        application_description: &ApplicationDescription,
922        txn_tracker: &mut TransactionTracker,
923    ) -> Result<(), ExecutionError> {
924        // Make sure that referenced applications IDs have been registered.
925        for required_id in &application_description.required_application_ids {
926            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
927        }
928        Ok(())
929    }
930
931    /// Retrieves an application's description.
932    pub async fn describe_application(
933        &mut self,
934        id: ApplicationId,
935        txn_tracker: &mut TransactionTracker,
936    ) -> Result<ApplicationDescription, ExecutionError> {
937        let blob_id = id.description_blob_id();
938        let content = match txn_tracker.created_blobs().get(&blob_id) {
939            Some(content) => content.clone(),
940            None => self.read_blob_content(blob_id).await?,
941        };
942        self.blob_used(txn_tracker, blob_id).await?;
943        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
944
945        let blob_ids = self
946            .check_bytecode_blobs(&description.module_id, txn_tracker)
947            .await?;
948        // We only remember to register the blobs that aren't recorded in `used_blobs`
949        // already.
950        for blob_id in blob_ids {
951            self.blob_used(txn_tracker, blob_id).await?;
952        }
953
954        self.check_required_applications(&description, txn_tracker)
955            .await?;
956
957        Ok(description)
958    }
959
960    /// Retrieves the recursive dependencies of applications and applies a topological sort.
961    pub async fn find_dependencies(
962        &mut self,
963        mut stack: Vec<ApplicationId>,
964        txn_tracker: &mut TransactionTracker,
965    ) -> Result<Vec<ApplicationId>, ExecutionError> {
966        // What we return at the end.
967        let mut result = Vec::new();
968        // The entries already inserted in `result`.
969        let mut sorted = HashSet::new();
970        // The entries for which dependencies have already been pushed once to the stack.
971        let mut seen = HashSet::new();
972
973        while let Some(id) = stack.pop() {
974            if sorted.contains(&id) {
975                continue;
976            }
977            if seen.contains(&id) {
978                // Second time we see this entry. It was last pushed just before its
979                // dependencies -- which are now fully sorted.
980                sorted.insert(id);
981                result.push(id);
982                continue;
983            }
984            // First time we see this entry:
985            // 1. Mark it so that its dependencies are no longer pushed to the stack.
986            seen.insert(id);
987            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
988            stack.push(id);
989            let app = self.describe_application(id, txn_tracker).await?;
990            for child in app.required_application_ids.iter().rev() {
991                if !seen.contains(child) {
992                    stack.push(*child);
993                }
994            }
995        }
996        Ok(result)
997    }
998
999    /// Records a blob that is used in this block. If this is the first use on this chain, creates
1000    /// an oracle response for it.
1001    pub(crate) async fn blob_used(
1002        &mut self,
1003        txn_tracker: &mut TransactionTracker,
1004        blob_id: BlobId,
1005    ) -> Result<bool, ExecutionError> {
1006        if self.used_blobs.contains(&blob_id).await? {
1007            return Ok(false); // Nothing to do.
1008        }
1009        self.used_blobs.insert(&blob_id)?;
1010        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
1011        Ok(true)
1012    }
1013
1014    /// Records a blob that is published in this block. This does not create an oracle entry, and
1015    /// the blob can be used without using an oracle in the future on this chain.
1016    fn blob_published(
1017        &mut self,
1018        blob_id: &BlobId,
1019        txn_tracker: &mut TransactionTracker,
1020    ) -> Result<(), ExecutionError> {
1021        self.used_blobs.insert(blob_id)?;
1022        txn_tracker.add_published_blob(*blob_id);
1023        Ok(())
1024    }
1025
1026    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1027        match self.context().extra().get_blob(blob_id).await {
1028            Ok(Some(blob)) => Ok(blob.into()),
1029            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1030            Err(error) => Err(error.into()),
1031        }
1032    }
1033
1034    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1035        if self.context().extra().contains_blob(blob_id).await? {
1036            Ok(())
1037        } else {
1038            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1039        }
1040    }
1041
1042    async fn check_bytecode_blobs(
1043        &mut self,
1044        module_id: &ModuleId,
1045        txn_tracker: &TransactionTracker,
1046    ) -> Result<Vec<BlobId>, ExecutionError> {
1047        let blob_ids = module_id.bytecode_blob_ids();
1048
1049        let mut missing_blobs = Vec::new();
1050        for blob_id in &blob_ids {
1051            // First check if blob is present in created_blobs
1052            if txn_tracker.created_blobs().contains_key(blob_id) {
1053                continue; // Blob found in created_blobs, it's ok
1054            }
1055            // If not in created_blobs, check storage
1056            if !self.context().extra().contains_blob(*blob_id).await? {
1057                missing_blobs.push(*blob_id);
1058            }
1059        }
1060        ensure!(
1061            missing_blobs.is_empty(),
1062            ExecutionError::BlobsNotFound(missing_blobs)
1063        );
1064
1065        Ok(blob_ids)
1066    }
1067}