linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::collections::{BTreeMap, BTreeSet, HashSet};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    crypto::CryptoHash,
14    data_types::{
15        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
16        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
17    },
18    ensure, hex_debug,
19    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
20    ownership::{ChainOwnership, TimeoutConfig},
21};
22use linera_views::{
23    context::Context,
24    map_view::HashedMapView,
25    register_view::HashedRegisterView,
26    set_view::HashedSetView,
27    views::{ClonableView, HashableView, ReplaceContext, View},
28};
29use serde::{Deserialize, Serialize};
30
31#[cfg(test)]
32use crate::test_utils::SystemExecutionState;
33use crate::{
34    committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
35    ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
36    OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
37};
38
39/// The event stream name for new epochs and committees.
40pub static EPOCH_STREAM_NAME: &[u8] = &[0];
41/// The event stream name for removed epochs.
42pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
43
44/// The number of times the [`SystemOperation::OpenChain`] was executed.
45#[cfg(with_metrics)]
46mod metrics {
47    use std::sync::LazyLock;
48
49    use linera_base::prometheus_util::register_int_counter_vec;
50    use prometheus::IntCounterVec;
51
52    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
53        register_int_counter_vec(
54            "open_chain_count",
55            "The number of times the `OpenChain` operation was executed",
56            &[],
57        )
58    });
59}
60
61/// A view accessing the execution state of the system of a chain.
62#[derive(Debug, ClonableView, HashableView)]
63pub struct SystemExecutionStateView<C> {
64    /// How the chain was created. May be unknown for inactive chains.
65    pub description: HashedRegisterView<C, Option<ChainDescription>>,
66    /// The number identifying the current configuration.
67    pub epoch: HashedRegisterView<C, Epoch>,
68    /// The admin of the chain.
69    pub admin_id: HashedRegisterView<C, Option<ChainId>>,
70    /// The committees that we trust, indexed by epoch number.
71    // Not using a `MapView` because the set active of committees is supposed to be
72    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
73    // (e.g. the `OpenChain` operation).
74    pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
75    /// Ownership of the chain.
76    pub ownership: HashedRegisterView<C, ChainOwnership>,
77    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
78    pub balance: HashedRegisterView<C, Amount>,
79    /// Balances attributed to a given owner.
80    pub balances: HashedMapView<C, AccountOwner, Amount>,
81    /// The timestamp of the most recent block.
82    pub timestamp: HashedRegisterView<C, Timestamp>,
83    /// Whether this chain has been closed.
84    pub closed: HashedRegisterView<C, bool>,
85    /// Permissions for applications on this chain.
86    pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
87    /// Blobs that have been used or published on this chain.
88    pub used_blobs: HashedSetView<C, BlobId>,
89    /// The event stream subscriptions of applications on this chain.
90    pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
91}
92
93impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
94    type Target = SystemExecutionStateView<C2>;
95
96    async fn with_context(
97        &mut self,
98        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
99    ) -> Self::Target {
100        SystemExecutionStateView {
101            description: self.description.with_context(ctx.clone()).await,
102            epoch: self.epoch.with_context(ctx.clone()).await,
103            admin_id: self.admin_id.with_context(ctx.clone()).await,
104            committees: self.committees.with_context(ctx.clone()).await,
105            ownership: self.ownership.with_context(ctx.clone()).await,
106            balance: self.balance.with_context(ctx.clone()).await,
107            balances: self.balances.with_context(ctx.clone()).await,
108            timestamp: self.timestamp.with_context(ctx.clone()).await,
109            closed: self.closed.with_context(ctx.clone()).await,
110            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
111            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
112            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
113        }
114    }
115}
116
117/// The applications subscribing to a particular stream, and the next event index.
118#[derive(Debug, Default, Clone, Serialize, Deserialize)]
119pub struct EventSubscriptions {
120    /// The next event index, i.e. the total number of events in this stream that have already
121    /// been processed by this chain.
122    pub next_index: u32,
123    /// The applications that are subscribed to this stream.
124    pub applications: BTreeSet<ApplicationId>,
125}
126
127/// The initial configuration for a new chain.
128#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
129pub struct OpenChainConfig {
130    /// The ownership configuration of the new chain.
131    pub ownership: ChainOwnership,
132    /// The initial chain balance.
133    pub balance: Amount,
134    /// The initial application permissions.
135    pub application_permissions: ApplicationPermissions,
136}
137
138impl OpenChainConfig {
139    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
140    /// parameters.
141    pub fn init_chain_config(
142        &self,
143        epoch: Epoch,
144        min_active_epoch: Epoch,
145        max_active_epoch: Epoch,
146    ) -> InitialChainConfig {
147        InitialChainConfig {
148            application_permissions: self.application_permissions.clone(),
149            balance: self.balance,
150            epoch,
151            min_active_epoch,
152            max_active_epoch,
153            ownership: self.ownership.clone(),
154        }
155    }
156}
157
158/// A system operation.
159#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
160pub enum SystemOperation {
161    /// Transfers `amount` units of value from the given owner's account to the recipient.
162    /// If no owner is given, try to take the units out of the unattributed account.
163    Transfer {
164        owner: AccountOwner,
165        recipient: Account,
166        amount: Amount,
167    },
168    /// Claims `amount` units of value from the given owner's account in the remote
169    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
170    /// process the message.
171    Claim {
172        owner: AccountOwner,
173        target_id: ChainId,
174        recipient: Account,
175        amount: Amount,
176    },
177    /// Creates (or activates) a new chain.
178    /// This will automatically subscribe to the future committees created by `admin_id`.
179    OpenChain(OpenChainConfig),
180    /// Closes the chain.
181    CloseChain,
182    /// Changes the ownership of the chain.
183    ChangeOwnership {
184        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
185        #[debug(skip_if = Vec::is_empty)]
186        super_owners: Vec<AccountOwner>,
187        /// The regular owners, with their weights that determine how often they are round leader.
188        #[debug(skip_if = Vec::is_empty)]
189        owners: Vec<(AccountOwner, u64)>,
190        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
191        multi_leader_rounds: u32,
192        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
193        /// This should only be `true` on chains with restrictive application permissions and an
194        /// application-based mechanism to select block proposers.
195        open_multi_leader_rounds: bool,
196        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
197        timeout_config: TimeoutConfig,
198    },
199    /// Changes the application permissions configuration on this chain.
200    ChangeApplicationPermissions(ApplicationPermissions),
201    /// Publishes a new application module.
202    PublishModule { module_id: ModuleId },
203    /// Publishes a new data blob.
204    PublishDataBlob { blob_hash: CryptoHash },
205    /// Verifies that the given blob exists. Otherwise the block fails.
206    VerifyBlob { blob_id: BlobId },
207    /// Creates a new application.
208    CreateApplication {
209        module_id: ModuleId,
210        #[serde(with = "serde_bytes")]
211        #[debug(with = "hex_debug")]
212        parameters: Vec<u8>,
213        #[serde(with = "serde_bytes")]
214        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
215        instantiation_argument: Vec<u8>,
216        #[debug(skip_if = Vec::is_empty)]
217        required_application_ids: Vec<ApplicationId>,
218    },
219    /// Operations that are only allowed on the admin chain.
220    Admin(AdminOperation),
221    /// Processes an event about a new epoch and committee.
222    ProcessNewEpoch(Epoch),
223    /// Processes an event about a removed epoch and committee.
224    ProcessRemovedEpoch(Epoch),
225    /// Updates the event stream trackers.
226    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
227}
228
229/// Operations that are only allowed on the admin chain.
230#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
231pub enum AdminOperation {
232    /// Publishes a new committee as a blob. This can be assigned to an epoch using
233    /// [`AdminOperation::CreateCommittee`] in a later block.
234    PublishCommitteeBlob { blob_hash: CryptoHash },
235    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
236    /// [`SystemOperation::ProcessNewEpoch`].
237    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
238    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
239    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
240    /// re-certified) by a block certified by a recent committee.
241    RemoveCommittee { epoch: Epoch },
242}
243
244/// A system message meant to be executed on a remote chain.
245#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
246pub enum SystemMessage {
247    /// Credits `amount` units of value to the account `target` -- unless the message is
248    /// bouncing, in which case `source` is credited instead.
249    Credit {
250        target: AccountOwner,
251        amount: Amount,
252        source: AccountOwner,
253    },
254    /// Withdraws `amount` units of value from the account and starts a transfer to credit
255    /// the recipient. The message must be properly authenticated. Receiver chains may
256    /// refuse it depending on their configuration.
257    Withdraw {
258        owner: AccountOwner,
259        amount: Amount,
260        recipient: Account,
261    },
262}
263
264/// A query to the system state.
265#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
266pub struct SystemQuery;
267
268/// The response to a system query.
269#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
270pub struct SystemResponse {
271    pub chain_id: ChainId,
272    pub balance: Amount,
273}
274
275/// Optional user message attached to a transfer.
276#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
277pub struct UserData(pub Option<[u8; 32]>);
278
279impl UserData {
280    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
281        // Convert the Option<String> to Option<[u8; 32]>
282        let option_array = match opt_str {
283            Some(s) => {
284                // Convert the String to a Vec<u8>
285                let vec = s.into_bytes();
286                if vec.len() <= 32 {
287                    // Create an array from the Vec<u8>
288                    let mut array = [b' '; 32];
289
290                    // Copy bytes from the vector into the array
291                    let len = vec.len().min(32);
292                    array[..len].copy_from_slice(&vec[..len]);
293
294                    Some(array)
295                } else {
296                    return Err(vec.len());
297                }
298            }
299            None => None,
300        };
301
302        // Return the UserData with the converted Option<[u8; 32]>
303        Ok(UserData(option_array))
304    }
305}
306
307#[derive(Debug)]
308pub struct CreateApplicationResult {
309    pub app_id: ApplicationId,
310}
311
312impl<C> SystemExecutionStateView<C>
313where
314    C: Context + Clone + Send + Sync + 'static,
315    C::Extra: ExecutionRuntimeContext,
316{
317    /// Invariant for the states of active chains.
318    pub fn is_active(&self) -> bool {
319        self.description.get().is_some()
320            && self.ownership.get().is_active()
321            && self.current_committee().is_some()
322            && self.admin_id.get().is_some()
323    }
324
325    /// Returns the current committee, if any.
326    pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
327        let epoch = self.epoch.get();
328        let committee = self.committees.get().get(epoch)?;
329        Some((*epoch, committee))
330    }
331
332    async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
333        match self.context().extra().get_event(event_id.clone()).await? {
334            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
335            Some(vec) => Ok(vec),
336        }
337    }
338
339    /// Executes the sender's side of an operation and returns a list of actions to be
340    /// taken.
341    pub async fn execute_operation(
342        &mut self,
343        context: OperationContext,
344        operation: SystemOperation,
345        txn_tracker: &mut TransactionTracker,
346        resource_controller: &mut ResourceController<Option<AccountOwner>>,
347    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
348        use SystemOperation::*;
349        let mut new_application = None;
350        match operation {
351            OpenChain(config) => {
352                let _chain_id = self
353                    .open_chain(
354                        config,
355                        context.chain_id,
356                        context.height,
357                        context.timestamp,
358                        txn_tracker,
359                    )
360                    .await?;
361                #[cfg(with_metrics)]
362                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
363            }
364            ChangeOwnership {
365                super_owners,
366                owners,
367                multi_leader_rounds,
368                open_multi_leader_rounds,
369                timeout_config,
370            } => {
371                self.ownership.set(ChainOwnership {
372                    super_owners: super_owners.into_iter().collect(),
373                    owners: owners.into_iter().collect(),
374                    multi_leader_rounds,
375                    open_multi_leader_rounds,
376                    timeout_config,
377                });
378            }
379            ChangeApplicationPermissions(application_permissions) => {
380                self.application_permissions.set(application_permissions);
381            }
382            CloseChain => self.close_chain(),
383            Transfer {
384                owner,
385                amount,
386                recipient,
387            } => {
388                let maybe_message = self
389                    .transfer(context.authenticated_signer, None, owner, recipient, amount)
390                    .await?;
391                txn_tracker.add_outgoing_messages(maybe_message);
392            }
393            Claim {
394                owner,
395                target_id,
396                recipient,
397                amount,
398            } => {
399                let maybe_message = self
400                    .claim(
401                        context.authenticated_signer,
402                        None,
403                        owner,
404                        target_id,
405                        recipient,
406                        amount,
407                    )
408                    .await?;
409                txn_tracker.add_outgoing_messages(maybe_message);
410            }
411            Admin(admin_operation) => {
412                ensure!(
413                    *self.admin_id.get() == Some(context.chain_id),
414                    ExecutionError::AdminOperationOnNonAdminChain
415                );
416                match admin_operation {
417                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
418                        self.blob_published(
419                            &BlobId::new(blob_hash, BlobType::Committee),
420                            txn_tracker,
421                        )?;
422                    }
423                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
424                        self.check_next_epoch(epoch)?;
425                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
426                        let committee =
427                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
428                        self.blob_used(txn_tracker, blob_id).await?;
429                        self.committees.get_mut().insert(epoch, committee);
430                        self.epoch.set(epoch);
431                        txn_tracker.add_event(
432                            StreamId::system(EPOCH_STREAM_NAME),
433                            epoch.0,
434                            bcs::to_bytes(&blob_hash)?,
435                        );
436                    }
437                    AdminOperation::RemoveCommittee { epoch } => {
438                        ensure!(
439                            self.committees.get_mut().remove(&epoch).is_some(),
440                            ExecutionError::InvalidCommitteeRemoval
441                        );
442                        txn_tracker.add_event(
443                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
444                            epoch.0,
445                            vec![],
446                        );
447                    }
448                }
449            }
450            PublishModule { module_id } => {
451                for blob_id in module_id.bytecode_blob_ids() {
452                    self.blob_published(&blob_id, txn_tracker)?;
453                }
454            }
455            CreateApplication {
456                module_id,
457                parameters,
458                instantiation_argument,
459                required_application_ids,
460            } => {
461                let CreateApplicationResult { app_id } = self
462                    .create_application(
463                        context.chain_id,
464                        context.height,
465                        module_id,
466                        parameters,
467                        required_application_ids,
468                        txn_tracker,
469                    )
470                    .await?;
471                new_application = Some((app_id, instantiation_argument));
472            }
473            PublishDataBlob { blob_hash } => {
474                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
475            }
476            VerifyBlob { blob_id } => {
477                self.assert_blob_exists(blob_id).await?;
478                resource_controller
479                    .with_state(self)
480                    .await?
481                    .track_blob_read(0)?;
482                self.blob_used(txn_tracker, blob_id).await?;
483            }
484            ProcessNewEpoch(epoch) => {
485                self.check_next_epoch(epoch)?;
486                let admin_id = self
487                    .admin_id
488                    .get()
489                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
490                let event_id = EventId {
491                    chain_id: admin_id,
492                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
493                    index: epoch.0,
494                };
495                let bytes = txn_tracker
496                    .oracle(|| async {
497                        let bytes = self.get_event(event_id.clone()).await?;
498                        Ok(OracleResponse::Event(event_id.clone(), bytes))
499                    })
500                    .await?
501                    .to_event(&event_id)?;
502                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
503                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
504                self.blob_used(txn_tracker, blob_id).await?;
505                self.committees.get_mut().insert(epoch, committee);
506                self.epoch.set(epoch);
507            }
508            ProcessRemovedEpoch(epoch) => {
509                ensure!(
510                    self.committees.get_mut().remove(&epoch).is_some(),
511                    ExecutionError::InvalidCommitteeRemoval
512                );
513                let admin_id = self
514                    .admin_id
515                    .get()
516                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
517                let event_id = EventId {
518                    chain_id: admin_id,
519                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
520                    index: epoch.0,
521                };
522                txn_tracker
523                    .oracle(|| async {
524                        let bytes = self.get_event(event_id.clone()).await?;
525                        Ok(OracleResponse::Event(event_id, bytes))
526                    })
527                    .await?;
528            }
529            UpdateStreams(streams) => {
530                let mut missing_events = Vec::new();
531                for (chain_id, stream_id, next_index) in streams {
532                    let subscriptions = self
533                        .event_subscriptions
534                        .get_mut_or_default(&(chain_id, stream_id.clone()))
535                        .await?;
536                    ensure!(
537                        subscriptions.next_index < next_index,
538                        ExecutionError::OutdatedUpdateStreams
539                    );
540                    for application_id in &subscriptions.applications {
541                        txn_tracker.add_stream_to_process(
542                            *application_id,
543                            chain_id,
544                            stream_id.clone(),
545                            subscriptions.next_index,
546                            next_index,
547                        );
548                    }
549                    subscriptions.next_index = next_index;
550                    let index = next_index
551                        .checked_sub(1)
552                        .ok_or(ArithmeticError::Underflow)?;
553                    let event_id = EventId {
554                        chain_id,
555                        stream_id,
556                        index,
557                    };
558                    let extra = self.context().extra();
559                    txn_tracker
560                        .oracle(|| async {
561                            if !extra.contains_event(event_id.clone()).await? {
562                                missing_events.push(event_id.clone());
563                            }
564                            Ok(OracleResponse::EventExists(event_id))
565                        })
566                        .await?;
567                }
568                ensure!(
569                    missing_events.is_empty(),
570                    ExecutionError::EventsNotFound(missing_events)
571                );
572            }
573        }
574
575        Ok(new_application)
576    }
577
578    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
579    /// epoch.
580    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
581        let expected = self.epoch.get().try_add_one()?;
582        ensure!(
583            provided == expected,
584            ExecutionError::InvalidCommitteeEpoch { provided, expected }
585        );
586        Ok(())
587    }
588
589    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
590        if owner == &AccountOwner::CHAIN {
591            let new_balance = self.balance.get().saturating_add(amount);
592            self.balance.set(new_balance);
593        } else {
594            let balance = self.balances.get_mut_or_default(owner).await?;
595            *balance = balance.saturating_add(amount);
596        }
597        Ok(())
598    }
599
600    async fn credit_or_send_message(
601        &mut self,
602        source: AccountOwner,
603        recipient: Account,
604        amount: Amount,
605    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
606        let source_chain_id = self.context().extra().chain_id();
607        if recipient.chain_id == source_chain_id {
608            // Handle same-chain transfer locally.
609            let target = recipient.owner;
610            self.credit(&target, amount).await?;
611            Ok(None)
612        } else {
613            // Handle cross-chain transfer with message.
614            let message = SystemMessage::Credit {
615                amount,
616                source,
617                target: recipient.owner,
618            };
619            Ok(Some(
620                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
621            ))
622        }
623    }
624
625    pub async fn transfer(
626        &mut self,
627        authenticated_signer: Option<AccountOwner>,
628        authenticated_application_id: Option<ApplicationId>,
629        source: AccountOwner,
630        recipient: Account,
631        amount: Amount,
632    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
633        if source == AccountOwner::CHAIN {
634            ensure!(
635                authenticated_signer.is_some()
636                    && self
637                        .ownership
638                        .get()
639                        .verify_owner(&authenticated_signer.unwrap()),
640                ExecutionError::UnauthenticatedTransferOwner
641            );
642        } else {
643            ensure!(
644                authenticated_signer == Some(source)
645                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
646                ExecutionError::UnauthenticatedTransferOwner
647            );
648        }
649        ensure!(
650            amount > Amount::ZERO,
651            ExecutionError::IncorrectTransferAmount
652        );
653        self.debit(&source, amount).await?;
654        self.credit_or_send_message(source, recipient, amount).await
655    }
656
657    pub async fn claim(
658        &mut self,
659        authenticated_signer: Option<AccountOwner>,
660        authenticated_application_id: Option<ApplicationId>,
661        source: AccountOwner,
662        target_id: ChainId,
663        recipient: Account,
664        amount: Amount,
665    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
666        ensure!(
667            authenticated_signer == Some(source)
668                || authenticated_application_id.map(AccountOwner::from) == Some(source),
669            ExecutionError::UnauthenticatedClaimOwner
670        );
671        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
672
673        let current_chain_id = self.context().extra().chain_id();
674        if target_id == current_chain_id {
675            // Handle same-chain claim locally by processing the withdraw operation directly
676            self.debit(&source, amount).await?;
677            self.credit_or_send_message(source, recipient, amount).await
678        } else {
679            // Handle cross-chain claim with Withdraw message
680            let message = SystemMessage::Withdraw {
681                amount,
682                owner: source,
683                recipient,
684            };
685            Ok(Some(
686                OutgoingMessage::new(target_id, message)
687                    .with_authenticated_signer(authenticated_signer),
688            ))
689        }
690    }
691
692    /// Debits an [`Amount`] of tokens from an account's balance.
693    async fn debit(
694        &mut self,
695        account: &AccountOwner,
696        amount: Amount,
697    ) -> Result<(), ExecutionError> {
698        let balance = if account == &AccountOwner::CHAIN {
699            self.balance.get_mut()
700        } else {
701            self.balances.get_mut(account).await?.ok_or_else(|| {
702                ExecutionError::InsufficientBalance {
703                    balance: Amount::ZERO,
704                    account: *account,
705                }
706            })?
707        };
708
709        balance
710            .try_sub_assign(amount)
711            .map_err(|_| ExecutionError::InsufficientBalance {
712                balance: *balance,
713                account: *account,
714            })?;
715
716        if account != &AccountOwner::CHAIN && balance.is_zero() {
717            self.balances.remove(account)?;
718        }
719
720        Ok(())
721    }
722
723    /// Executes a cross-chain message that represents the recipient's side of an operation.
724    pub async fn execute_message(
725        &mut self,
726        context: MessageContext,
727        message: SystemMessage,
728    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
729        let mut outcome = Vec::new();
730        use SystemMessage::*;
731        match message {
732            Credit {
733                amount,
734                source,
735                target,
736            } => {
737                let receiver = if context.is_bouncing { source } else { target };
738                self.credit(&receiver, amount).await?;
739            }
740            Withdraw {
741                amount,
742                owner,
743                recipient,
744            } => {
745                self.debit(&owner, amount).await?;
746                if let Some(message) = self
747                    .credit_or_send_message(owner, recipient, amount)
748                    .await?
749                {
750                    outcome.push(message);
751                }
752            }
753        }
754        Ok(outcome)
755    }
756
757    /// Initializes the system application state on a newly opened chain.
758    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
759    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
760        if self.description.get().is_some() {
761            // already initialized
762            return Ok(true);
763        }
764        let description_blob = self
765            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
766            .await?;
767        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
768        let InitialChainConfig {
769            ownership,
770            epoch,
771            balance,
772            min_active_epoch,
773            max_active_epoch,
774            application_permissions,
775        } = description.config().clone();
776        self.timestamp.set(description.timestamp());
777        self.description.set(Some(description));
778        self.epoch.set(epoch);
779        let committees = self
780            .context()
781            .extra()
782            .committees_for(min_active_epoch..=max_active_epoch)
783            .await?;
784        self.committees.set(committees);
785        let admin_id = self
786            .context()
787            .extra()
788            .get_network_description()
789            .await?
790            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
791            .admin_chain_id;
792        self.admin_id.set(Some(admin_id));
793        self.ownership.set(ownership);
794        self.balance.set(balance);
795        self.application_permissions.set(application_permissions);
796        Ok(false)
797    }
798
799    pub fn handle_query(
800        &mut self,
801        context: QueryContext,
802        _query: SystemQuery,
803    ) -> QueryOutcome<SystemResponse> {
804        let response = SystemResponse {
805            chain_id: context.chain_id,
806            balance: *self.balance.get(),
807        };
808        QueryOutcome {
809            response,
810            operations: vec![],
811        }
812    }
813
814    /// Returns the messages to open a new chain, and subtracts the new chain's balance
815    /// from this chain's.
816    pub async fn open_chain(
817        &mut self,
818        config: OpenChainConfig,
819        parent: ChainId,
820        block_height: BlockHeight,
821        timestamp: Timestamp,
822        txn_tracker: &mut TransactionTracker,
823    ) -> Result<ChainId, ExecutionError> {
824        let chain_index = txn_tracker.next_chain_index();
825        let chain_origin = ChainOrigin::Child {
826            parent,
827            block_height,
828            chain_index,
829        };
830        let init_chain_config = config.init_chain_config(
831            *self.epoch.get(),
832            self.committees
833                .get()
834                .keys()
835                .min()
836                .copied()
837                .unwrap_or(Epoch::ZERO),
838            self.committees
839                .get()
840                .keys()
841                .max()
842                .copied()
843                .unwrap_or(Epoch::ZERO),
844        );
845        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
846        let child_id = chain_description.id();
847        self.debit(&AccountOwner::CHAIN, config.balance).await?;
848        let blob = Blob::new_chain_description(&chain_description);
849        txn_tracker.add_created_blob(blob);
850        Ok(child_id)
851    }
852
853    pub fn close_chain(&mut self) {
854        self.closed.set(true);
855    }
856
857    pub async fn create_application(
858        &mut self,
859        chain_id: ChainId,
860        block_height: BlockHeight,
861        module_id: ModuleId,
862        parameters: Vec<u8>,
863        required_application_ids: Vec<ApplicationId>,
864        txn_tracker: &mut TransactionTracker,
865    ) -> Result<CreateApplicationResult, ExecutionError> {
866        let application_index = txn_tracker.next_application_index();
867
868        let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
869        // We only remember to register the blobs that aren't recorded in `used_blobs`
870        // already.
871        for blob_id in blob_ids {
872            self.blob_used(txn_tracker, blob_id).await?;
873        }
874
875        let application_description = ApplicationDescription {
876            module_id,
877            creator_chain_id: chain_id,
878            block_height,
879            application_index,
880            parameters,
881            required_application_ids,
882        };
883        self.check_required_applications(&application_description, txn_tracker)
884            .await?;
885
886        let blob = Blob::new_application_description(&application_description);
887        self.used_blobs.insert(&blob.id())?;
888        txn_tracker.add_created_blob(blob);
889
890        Ok(CreateApplicationResult {
891            app_id: ApplicationId::from(&application_description),
892        })
893    }
894
895    async fn check_required_applications(
896        &mut self,
897        application_description: &ApplicationDescription,
898        txn_tracker: &mut TransactionTracker,
899    ) -> Result<(), ExecutionError> {
900        // Make sure that referenced applications IDs have been registered.
901        for required_id in &application_description.required_application_ids {
902            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
903        }
904        Ok(())
905    }
906
907    /// Retrieves an application's description.
908    pub async fn describe_application(
909        &mut self,
910        id: ApplicationId,
911        txn_tracker: &mut TransactionTracker,
912    ) -> Result<ApplicationDescription, ExecutionError> {
913        let blob_id = id.description_blob_id();
914        let content = match txn_tracker.created_blobs().get(&blob_id) {
915            Some(content) => content.clone(),
916            None => self.read_blob_content(blob_id).await?,
917        };
918        self.blob_used(txn_tracker, blob_id).await?;
919        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
920
921        let blob_ids = self
922            .check_bytecode_blobs(&description.module_id, txn_tracker)
923            .await?;
924        // We only remember to register the blobs that aren't recorded in `used_blobs`
925        // already.
926        for blob_id in blob_ids {
927            self.blob_used(txn_tracker, blob_id).await?;
928        }
929
930        self.check_required_applications(&description, txn_tracker)
931            .await?;
932
933        Ok(description)
934    }
935
936    /// Retrieves the recursive dependencies of applications and applies a topological sort.
937    pub async fn find_dependencies(
938        &mut self,
939        mut stack: Vec<ApplicationId>,
940        txn_tracker: &mut TransactionTracker,
941    ) -> Result<Vec<ApplicationId>, ExecutionError> {
942        // What we return at the end.
943        let mut result = Vec::new();
944        // The entries already inserted in `result`.
945        let mut sorted = HashSet::new();
946        // The entries for which dependencies have already been pushed once to the stack.
947        let mut seen = HashSet::new();
948
949        while let Some(id) = stack.pop() {
950            if sorted.contains(&id) {
951                continue;
952            }
953            if seen.contains(&id) {
954                // Second time we see this entry. It was last pushed just before its
955                // dependencies -- which are now fully sorted.
956                sorted.insert(id);
957                result.push(id);
958                continue;
959            }
960            // First time we see this entry:
961            // 1. Mark it so that its dependencies are no longer pushed to the stack.
962            seen.insert(id);
963            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
964            stack.push(id);
965            let app = self.describe_application(id, txn_tracker).await?;
966            for child in app.required_application_ids.iter().rev() {
967                if !seen.contains(child) {
968                    stack.push(*child);
969                }
970            }
971        }
972        Ok(result)
973    }
974
975    /// Records a blob that is used in this block. If this is the first use on this chain, creates
976    /// an oracle response for it.
977    pub(crate) async fn blob_used(
978        &mut self,
979        txn_tracker: &mut TransactionTracker,
980        blob_id: BlobId,
981    ) -> Result<bool, ExecutionError> {
982        if self.used_blobs.contains(&blob_id).await? {
983            return Ok(false); // Nothing to do.
984        }
985        self.used_blobs.insert(&blob_id)?;
986        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
987        Ok(true)
988    }
989
990    /// Records a blob that is published in this block. This does not create an oracle entry, and
991    /// the blob can be used without using an oracle in the future on this chain.
992    fn blob_published(
993        &mut self,
994        blob_id: &BlobId,
995        txn_tracker: &mut TransactionTracker,
996    ) -> Result<(), ExecutionError> {
997        self.used_blobs.insert(blob_id)?;
998        txn_tracker.add_published_blob(*blob_id);
999        Ok(())
1000    }
1001
1002    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1003        match self.context().extra().get_blob(blob_id).await {
1004            Ok(Some(blob)) => Ok(blob.into()),
1005            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1006            Err(error) => Err(error.into()),
1007        }
1008    }
1009
1010    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1011        if self.context().extra().contains_blob(blob_id).await? {
1012            Ok(())
1013        } else {
1014            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1015        }
1016    }
1017
1018    async fn check_bytecode_blobs(
1019        &mut self,
1020        module_id: &ModuleId,
1021        txn_tracker: &TransactionTracker,
1022    ) -> Result<Vec<BlobId>, ExecutionError> {
1023        let blob_ids = module_id.bytecode_blob_ids();
1024
1025        let mut missing_blobs = Vec::new();
1026        for blob_id in &blob_ids {
1027            // First check if blob is present in created_blobs
1028            if txn_tracker.created_blobs().contains_key(blob_id) {
1029                continue; // Blob found in created_blobs, it's ok
1030            }
1031            // If not in created_blobs, check storage
1032            if !self.context().extra().contains_blob(*blob_id).await? {
1033                missing_blobs.push(*blob_id);
1034            }
1035        }
1036        ensure!(
1037            missing_blobs.is_empty(),
1038            ExecutionError::BlobsNotFound(missing_blobs)
1039        );
1040
1041        Ok(blob_ids)
1042    }
1043}