linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::collections::{BTreeMap, BTreeSet, HashSet};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    crypto::CryptoHash,
14    data_types::{
15        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
16        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
17    },
18    ensure, hex_debug,
19    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
20    ownership::{ChainOwnership, TimeoutConfig},
21};
22use linera_views::{
23    context::Context,
24    map_view::HashedMapView,
25    register_view::HashedRegisterView,
26    set_view::HashedSetView,
27    views::{ClonableView, HashableView, ReplaceContext, View},
28};
29use serde::{Deserialize, Serialize};
30
31#[cfg(test)]
32use crate::test_utils::SystemExecutionState;
33use crate::{
34    committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
35    ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
36    OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
37};
38
39/// The event stream name for new epochs and committees.
40pub static EPOCH_STREAM_NAME: &[u8] = &[0];
41/// The event stream name for removed epochs.
42pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
43
44/// The number of times the [`SystemOperation::OpenChain`] was executed.
45#[cfg(with_metrics)]
46mod metrics {
47    use std::sync::LazyLock;
48
49    use linera_base::prometheus_util::register_int_counter_vec;
50    use prometheus::IntCounterVec;
51
52    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
53        register_int_counter_vec(
54            "open_chain_count",
55            "The number of times the `OpenChain` operation was executed",
56            &[],
57        )
58    });
59}
60
61/// A view accessing the execution state of the system of a chain.
62#[derive(Debug, ClonableView, HashableView)]
63pub struct SystemExecutionStateView<C> {
64    /// How the chain was created. May be unknown for inactive chains.
65    pub description: HashedRegisterView<C, Option<ChainDescription>>,
66    /// The number identifying the current configuration.
67    pub epoch: HashedRegisterView<C, Epoch>,
68    /// The admin of the chain.
69    pub admin_id: HashedRegisterView<C, Option<ChainId>>,
70    /// The committees that we trust, indexed by epoch number.
71    // Not using a `MapView` because the set active of committees is supposed to be
72    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
73    // (e.g. the `OpenChain` operation).
74    pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
75    /// Ownership of the chain.
76    pub ownership: HashedRegisterView<C, ChainOwnership>,
77    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
78    pub balance: HashedRegisterView<C, Amount>,
79    /// Balances attributed to a given owner.
80    pub balances: HashedMapView<C, AccountOwner, Amount>,
81    /// The timestamp of the most recent block.
82    pub timestamp: HashedRegisterView<C, Timestamp>,
83    /// Whether this chain has been closed.
84    pub closed: HashedRegisterView<C, bool>,
85    /// Permissions for applications on this chain.
86    pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
87    /// Blobs that have been used or published on this chain.
88    pub used_blobs: HashedSetView<C, BlobId>,
89    /// The event stream subscriptions of applications on this chain.
90    pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
91    /// The number of events in the streams that this chain is writing to.
92    pub stream_event_counts: HashedMapView<C, StreamId, u32>,
93}
94
95impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
96    type Target = SystemExecutionStateView<C2>;
97
98    async fn with_context(
99        &mut self,
100        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
101    ) -> Self::Target {
102        SystemExecutionStateView {
103            description: self.description.with_context(ctx.clone()).await,
104            epoch: self.epoch.with_context(ctx.clone()).await,
105            admin_id: self.admin_id.with_context(ctx.clone()).await,
106            committees: self.committees.with_context(ctx.clone()).await,
107            ownership: self.ownership.with_context(ctx.clone()).await,
108            balance: self.balance.with_context(ctx.clone()).await,
109            balances: self.balances.with_context(ctx.clone()).await,
110            timestamp: self.timestamp.with_context(ctx.clone()).await,
111            closed: self.closed.with_context(ctx.clone()).await,
112            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
113            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
114            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
115            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
116        }
117    }
118}
119
120/// The applications subscribing to a particular stream, and the next event index.
121#[derive(Debug, Default, Clone, Serialize, Deserialize)]
122pub struct EventSubscriptions {
123    /// The next event index, i.e. the total number of events in this stream that have already
124    /// been processed by this chain.
125    pub next_index: u32,
126    /// The applications that are subscribed to this stream.
127    pub applications: BTreeSet<ApplicationId>,
128}
129
130/// The initial configuration for a new chain.
131#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
132pub struct OpenChainConfig {
133    /// The ownership configuration of the new chain.
134    pub ownership: ChainOwnership,
135    /// The initial chain balance.
136    pub balance: Amount,
137    /// The initial application permissions.
138    pub application_permissions: ApplicationPermissions,
139}
140
141impl OpenChainConfig {
142    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
143    /// parameters.
144    pub fn init_chain_config(
145        &self,
146        epoch: Epoch,
147        min_active_epoch: Epoch,
148        max_active_epoch: Epoch,
149    ) -> InitialChainConfig {
150        InitialChainConfig {
151            application_permissions: self.application_permissions.clone(),
152            balance: self.balance,
153            epoch,
154            min_active_epoch,
155            max_active_epoch,
156            ownership: self.ownership.clone(),
157        }
158    }
159}
160
161/// A system operation.
162#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
163pub enum SystemOperation {
164    /// Transfers `amount` units of value from the given owner's account to the recipient.
165    /// If no owner is given, try to take the units out of the unattributed account.
166    Transfer {
167        owner: AccountOwner,
168        recipient: Account,
169        amount: Amount,
170    },
171    /// Claims `amount` units of value from the given owner's account in the remote
172    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
173    /// process the message.
174    Claim {
175        owner: AccountOwner,
176        target_id: ChainId,
177        recipient: Account,
178        amount: Amount,
179    },
180    /// Creates (or activates) a new chain.
181    /// This will automatically subscribe to the future committees created by `admin_id`.
182    OpenChain(OpenChainConfig),
183    /// Closes the chain.
184    CloseChain,
185    /// Changes the ownership of the chain.
186    ChangeOwnership {
187        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
188        #[debug(skip_if = Vec::is_empty)]
189        super_owners: Vec<AccountOwner>,
190        /// The regular owners, with their weights that determine how often they are round leader.
191        #[debug(skip_if = Vec::is_empty)]
192        owners: Vec<(AccountOwner, u64)>,
193        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
194        multi_leader_rounds: u32,
195        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
196        /// This should only be `true` on chains with restrictive application permissions and an
197        /// application-based mechanism to select block proposers.
198        open_multi_leader_rounds: bool,
199        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
200        timeout_config: TimeoutConfig,
201    },
202    /// Changes the application permissions configuration on this chain.
203    ChangeApplicationPermissions(ApplicationPermissions),
204    /// Publishes a new application module.
205    PublishModule { module_id: ModuleId },
206    /// Publishes a new data blob.
207    PublishDataBlob { blob_hash: CryptoHash },
208    /// Verifies that the given blob exists. Otherwise the block fails.
209    VerifyBlob { blob_id: BlobId },
210    /// Creates a new application.
211    CreateApplication {
212        module_id: ModuleId,
213        #[serde(with = "serde_bytes")]
214        #[debug(with = "hex_debug")]
215        parameters: Vec<u8>,
216        #[serde(with = "serde_bytes")]
217        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
218        instantiation_argument: Vec<u8>,
219        #[debug(skip_if = Vec::is_empty)]
220        required_application_ids: Vec<ApplicationId>,
221    },
222    /// Operations that are only allowed on the admin chain.
223    Admin(AdminOperation),
224    /// Processes an event about a new epoch and committee.
225    ProcessNewEpoch(Epoch),
226    /// Processes an event about a removed epoch and committee.
227    ProcessRemovedEpoch(Epoch),
228    /// Updates the event stream trackers.
229    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
230}
231
232/// Operations that are only allowed on the admin chain.
233#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
234pub enum AdminOperation {
235    /// Publishes a new committee as a blob. This can be assigned to an epoch using
236    /// [`AdminOperation::CreateCommittee`] in a later block.
237    PublishCommitteeBlob { blob_hash: CryptoHash },
238    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
239    /// [`SystemOperation::ProcessNewEpoch`].
240    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
241    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
242    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
243    /// re-certified) by a block certified by a recent committee.
244    RemoveCommittee { epoch: Epoch },
245}
246
247/// A system message meant to be executed on a remote chain.
248#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
249pub enum SystemMessage {
250    /// Credits `amount` units of value to the account `target` -- unless the message is
251    /// bouncing, in which case `source` is credited instead.
252    Credit {
253        target: AccountOwner,
254        amount: Amount,
255        source: AccountOwner,
256    },
257    /// Withdraws `amount` units of value from the account and starts a transfer to credit
258    /// the recipient. The message must be properly authenticated. Receiver chains may
259    /// refuse it depending on their configuration.
260    Withdraw {
261        owner: AccountOwner,
262        amount: Amount,
263        recipient: Account,
264    },
265}
266
267/// A query to the system state.
268#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
269pub struct SystemQuery;
270
271/// The response to a system query.
272#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
273pub struct SystemResponse {
274    pub chain_id: ChainId,
275    pub balance: Amount,
276}
277
278/// Optional user message attached to a transfer.
279#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
280pub struct UserData(pub Option<[u8; 32]>);
281
282impl UserData {
283    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
284        // Convert the Option<String> to Option<[u8; 32]>
285        let option_array = match opt_str {
286            Some(s) => {
287                // Convert the String to a Vec<u8>
288                let vec = s.into_bytes();
289                if vec.len() <= 32 {
290                    // Create an array from the Vec<u8>
291                    let mut array = [b' '; 32];
292
293                    // Copy bytes from the vector into the array
294                    let len = vec.len().min(32);
295                    array[..len].copy_from_slice(&vec[..len]);
296
297                    Some(array)
298                } else {
299                    return Err(vec.len());
300                }
301            }
302            None => None,
303        };
304
305        // Return the UserData with the converted Option<[u8; 32]>
306        Ok(UserData(option_array))
307    }
308}
309
310#[derive(Debug)]
311pub struct CreateApplicationResult {
312    pub app_id: ApplicationId,
313}
314
315impl<C> SystemExecutionStateView<C>
316where
317    C: Context + Clone + Send + Sync + 'static,
318    C::Extra: ExecutionRuntimeContext,
319{
320    /// Invariant for the states of active chains.
321    pub fn is_active(&self) -> bool {
322        self.description.get().is_some()
323            && self.ownership.get().is_active()
324            && self.current_committee().is_some()
325            && self.admin_id.get().is_some()
326    }
327
328    /// Returns the current committee, if any.
329    pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
330        let epoch = self.epoch.get();
331        let committee = self.committees.get().get(epoch)?;
332        Some((*epoch, committee))
333    }
334
335    async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
336        match self.context().extra().get_event(event_id.clone()).await? {
337            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
338            Some(vec) => Ok(vec),
339        }
340    }
341
342    /// Executes the sender's side of an operation and returns a list of actions to be
343    /// taken.
344    pub async fn execute_operation(
345        &mut self,
346        context: OperationContext,
347        operation: SystemOperation,
348        txn_tracker: &mut TransactionTracker,
349        resource_controller: &mut ResourceController<Option<AccountOwner>>,
350    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
351        use SystemOperation::*;
352        let mut new_application = None;
353        match operation {
354            OpenChain(config) => {
355                let _chain_id = self
356                    .open_chain(
357                        config,
358                        context.chain_id,
359                        context.height,
360                        context.timestamp,
361                        txn_tracker,
362                    )
363                    .await?;
364                #[cfg(with_metrics)]
365                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
366            }
367            ChangeOwnership {
368                super_owners,
369                owners,
370                multi_leader_rounds,
371                open_multi_leader_rounds,
372                timeout_config,
373            } => {
374                self.ownership.set(ChainOwnership {
375                    super_owners: super_owners.into_iter().collect(),
376                    owners: owners.into_iter().collect(),
377                    multi_leader_rounds,
378                    open_multi_leader_rounds,
379                    timeout_config,
380                });
381            }
382            ChangeApplicationPermissions(application_permissions) => {
383                self.application_permissions.set(application_permissions);
384            }
385            CloseChain => self.close_chain(),
386            Transfer {
387                owner,
388                amount,
389                recipient,
390            } => {
391                let maybe_message = self
392                    .transfer(context.authenticated_owner, None, owner, recipient, amount)
393                    .await?;
394                txn_tracker.add_outgoing_messages(maybe_message);
395            }
396            Claim {
397                owner,
398                target_id,
399                recipient,
400                amount,
401            } => {
402                let maybe_message = self
403                    .claim(
404                        context.authenticated_owner,
405                        None,
406                        owner,
407                        target_id,
408                        recipient,
409                        amount,
410                    )
411                    .await?;
412                txn_tracker.add_outgoing_messages(maybe_message);
413            }
414            Admin(admin_operation) => {
415                ensure!(
416                    *self.admin_id.get() == Some(context.chain_id),
417                    ExecutionError::AdminOperationOnNonAdminChain
418                );
419                match admin_operation {
420                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
421                        self.blob_published(
422                            &BlobId::new(blob_hash, BlobType::Committee),
423                            txn_tracker,
424                        )?;
425                    }
426                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
427                        self.check_next_epoch(epoch)?;
428                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
429                        let committee =
430                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
431                        self.blob_used(txn_tracker, blob_id).await?;
432                        self.committees.get_mut().insert(epoch, committee);
433                        self.epoch.set(epoch);
434                        txn_tracker.add_event(
435                            StreamId::system(EPOCH_STREAM_NAME),
436                            epoch.0,
437                            bcs::to_bytes(&blob_hash)?,
438                        );
439                    }
440                    AdminOperation::RemoveCommittee { epoch } => {
441                        ensure!(
442                            self.committees.get_mut().remove(&epoch).is_some(),
443                            ExecutionError::InvalidCommitteeRemoval
444                        );
445                        txn_tracker.add_event(
446                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
447                            epoch.0,
448                            vec![],
449                        );
450                    }
451                }
452            }
453            PublishModule { module_id } => {
454                for blob_id in module_id.bytecode_blob_ids() {
455                    self.blob_published(&blob_id, txn_tracker)?;
456                }
457            }
458            CreateApplication {
459                module_id,
460                parameters,
461                instantiation_argument,
462                required_application_ids,
463            } => {
464                let CreateApplicationResult { app_id } = self
465                    .create_application(
466                        context.chain_id,
467                        context.height,
468                        module_id,
469                        parameters,
470                        required_application_ids,
471                        txn_tracker,
472                    )
473                    .await?;
474                new_application = Some((app_id, instantiation_argument));
475            }
476            PublishDataBlob { blob_hash } => {
477                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
478            }
479            VerifyBlob { blob_id } => {
480                self.assert_blob_exists(blob_id).await?;
481                resource_controller
482                    .with_state(self)
483                    .await?
484                    .track_blob_read(0)?;
485                self.blob_used(txn_tracker, blob_id).await?;
486            }
487            ProcessNewEpoch(epoch) => {
488                self.check_next_epoch(epoch)?;
489                let admin_id = self
490                    .admin_id
491                    .get()
492                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
493                let event_id = EventId {
494                    chain_id: admin_id,
495                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
496                    index: epoch.0,
497                };
498                let bytes = txn_tracker
499                    .oracle(|| async {
500                        let bytes = self.get_event(event_id.clone()).await?;
501                        Ok(OracleResponse::Event(event_id.clone(), bytes))
502                    })
503                    .await?
504                    .to_event(&event_id)?;
505                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
506                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
507                self.blob_used(txn_tracker, blob_id).await?;
508                self.committees.get_mut().insert(epoch, committee);
509                self.epoch.set(epoch);
510            }
511            ProcessRemovedEpoch(epoch) => {
512                ensure!(
513                    self.committees.get_mut().remove(&epoch).is_some(),
514                    ExecutionError::InvalidCommitteeRemoval
515                );
516                let admin_id = self
517                    .admin_id
518                    .get()
519                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
520                let event_id = EventId {
521                    chain_id: admin_id,
522                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
523                    index: epoch.0,
524                };
525                txn_tracker
526                    .oracle(|| async {
527                        let bytes = self.get_event(event_id.clone()).await?;
528                        Ok(OracleResponse::Event(event_id, bytes))
529                    })
530                    .await?;
531            }
532            UpdateStreams(streams) => {
533                let mut missing_events = Vec::new();
534                for (chain_id, stream_id, next_index) in streams {
535                    let subscriptions = self
536                        .event_subscriptions
537                        .get_mut_or_default(&(chain_id, stream_id.clone()))
538                        .await?;
539                    ensure!(
540                        subscriptions.next_index < next_index,
541                        ExecutionError::OutdatedUpdateStreams
542                    );
543                    for application_id in &subscriptions.applications {
544                        txn_tracker.add_stream_to_process(
545                            *application_id,
546                            chain_id,
547                            stream_id.clone(),
548                            subscriptions.next_index,
549                            next_index,
550                        );
551                    }
552                    subscriptions.next_index = next_index;
553                    let index = next_index
554                        .checked_sub(1)
555                        .ok_or(ArithmeticError::Underflow)?;
556                    let event_id = EventId {
557                        chain_id,
558                        stream_id,
559                        index,
560                    };
561                    let extra = self.context().extra();
562                    txn_tracker
563                        .oracle(|| async {
564                            if !extra.contains_event(event_id.clone()).await? {
565                                missing_events.push(event_id.clone());
566                            }
567                            Ok(OracleResponse::EventExists(event_id))
568                        })
569                        .await?;
570                }
571                ensure!(
572                    missing_events.is_empty(),
573                    ExecutionError::EventsNotFound(missing_events)
574                );
575            }
576        }
577
578        Ok(new_application)
579    }
580
581    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
582    /// epoch.
583    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
584        let expected = self.epoch.get().try_add_one()?;
585        ensure!(
586            provided == expected,
587            ExecutionError::InvalidCommitteeEpoch { provided, expected }
588        );
589        Ok(())
590    }
591
592    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
593        if owner == &AccountOwner::CHAIN {
594            let new_balance = self.balance.get().saturating_add(amount);
595            self.balance.set(new_balance);
596        } else {
597            let balance = self.balances.get_mut_or_default(owner).await?;
598            *balance = balance.saturating_add(amount);
599        }
600        Ok(())
601    }
602
603    async fn credit_or_send_message(
604        &mut self,
605        source: AccountOwner,
606        recipient: Account,
607        amount: Amount,
608    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
609        let source_chain_id = self.context().extra().chain_id();
610        if recipient.chain_id == source_chain_id {
611            // Handle same-chain transfer locally.
612            let target = recipient.owner;
613            self.credit(&target, amount).await?;
614            Ok(None)
615        } else {
616            // Handle cross-chain transfer with message.
617            let message = SystemMessage::Credit {
618                amount,
619                source,
620                target: recipient.owner,
621            };
622            Ok(Some(
623                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
624            ))
625        }
626    }
627
628    pub async fn transfer(
629        &mut self,
630        authenticated_owner: Option<AccountOwner>,
631        authenticated_application_id: Option<ApplicationId>,
632        source: AccountOwner,
633        recipient: Account,
634        amount: Amount,
635    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
636        if source == AccountOwner::CHAIN {
637            ensure!(
638                authenticated_owner.is_some()
639                    && self
640                        .ownership
641                        .get()
642                        .verify_owner(&authenticated_owner.unwrap()),
643                ExecutionError::UnauthenticatedTransferOwner
644            );
645        } else {
646            ensure!(
647                authenticated_owner == Some(source)
648                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
649                ExecutionError::UnauthenticatedTransferOwner
650            );
651        }
652        ensure!(
653            amount > Amount::ZERO,
654            ExecutionError::IncorrectTransferAmount
655        );
656        self.debit(&source, amount).await?;
657        self.credit_or_send_message(source, recipient, amount).await
658    }
659
660    pub async fn claim(
661        &mut self,
662        authenticated_owner: Option<AccountOwner>,
663        authenticated_application_id: Option<ApplicationId>,
664        source: AccountOwner,
665        target_id: ChainId,
666        recipient: Account,
667        amount: Amount,
668    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
669        ensure!(
670            authenticated_owner == Some(source)
671                || authenticated_application_id.map(AccountOwner::from) == Some(source),
672            ExecutionError::UnauthenticatedClaimOwner
673        );
674        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
675
676        let current_chain_id = self.context().extra().chain_id();
677        if target_id == current_chain_id {
678            // Handle same-chain claim locally by processing the withdraw operation directly
679            self.debit(&source, amount).await?;
680            self.credit_or_send_message(source, recipient, amount).await
681        } else {
682            // Handle cross-chain claim with Withdraw message
683            let message = SystemMessage::Withdraw {
684                amount,
685                owner: source,
686                recipient,
687            };
688            Ok(Some(
689                OutgoingMessage::new(target_id, message)
690                    .with_authenticated_owner(authenticated_owner),
691            ))
692        }
693    }
694
695    /// Debits an [`Amount`] of tokens from an account's balance.
696    async fn debit(
697        &mut self,
698        account: &AccountOwner,
699        amount: Amount,
700    ) -> Result<(), ExecutionError> {
701        let balance = if account == &AccountOwner::CHAIN {
702            self.balance.get_mut()
703        } else {
704            self.balances.get_mut(account).await?.ok_or_else(|| {
705                ExecutionError::InsufficientBalance {
706                    balance: Amount::ZERO,
707                    account: *account,
708                }
709            })?
710        };
711
712        balance
713            .try_sub_assign(amount)
714            .map_err(|_| ExecutionError::InsufficientBalance {
715                balance: *balance,
716                account: *account,
717            })?;
718
719        if account != &AccountOwner::CHAIN && balance.is_zero() {
720            self.balances.remove(account)?;
721        }
722
723        Ok(())
724    }
725
726    /// Executes a cross-chain message that represents the recipient's side of an operation.
727    pub async fn execute_message(
728        &mut self,
729        context: MessageContext,
730        message: SystemMessage,
731    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
732        let mut outcome = Vec::new();
733        use SystemMessage::*;
734        match message {
735            Credit {
736                amount,
737                source,
738                target,
739            } => {
740                let receiver = if context.is_bouncing { source } else { target };
741                self.credit(&receiver, amount).await?;
742            }
743            Withdraw {
744                amount,
745                owner,
746                recipient,
747            } => {
748                self.debit(&owner, amount).await?;
749                if let Some(message) = self
750                    .credit_or_send_message(owner, recipient, amount)
751                    .await?
752                {
753                    outcome.push(message);
754                }
755            }
756        }
757        Ok(outcome)
758    }
759
760    /// Initializes the system application state on a newly opened chain.
761    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
762    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
763        if self.description.get().is_some() {
764            // already initialized
765            return Ok(true);
766        }
767        let description_blob = self
768            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
769            .await?;
770        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
771        let InitialChainConfig {
772            ownership,
773            epoch,
774            balance,
775            min_active_epoch,
776            max_active_epoch,
777            application_permissions,
778        } = description.config().clone();
779        self.timestamp.set(description.timestamp());
780        self.description.set(Some(description));
781        self.epoch.set(epoch);
782        let committees = self
783            .context()
784            .extra()
785            .committees_for(min_active_epoch..=max_active_epoch)
786            .await?;
787        self.committees.set(committees);
788        let admin_id = self
789            .context()
790            .extra()
791            .get_network_description()
792            .await?
793            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
794            .admin_chain_id;
795        self.admin_id.set(Some(admin_id));
796        self.ownership.set(ownership);
797        self.balance.set(balance);
798        self.application_permissions.set(application_permissions);
799        Ok(false)
800    }
801
802    pub fn handle_query(
803        &mut self,
804        context: QueryContext,
805        _query: SystemQuery,
806    ) -> QueryOutcome<SystemResponse> {
807        let response = SystemResponse {
808            chain_id: context.chain_id,
809            balance: *self.balance.get(),
810        };
811        QueryOutcome {
812            response,
813            operations: vec![],
814        }
815    }
816
817    /// Returns the messages to open a new chain, and subtracts the new chain's balance
818    /// from this chain's.
819    pub async fn open_chain(
820        &mut self,
821        config: OpenChainConfig,
822        parent: ChainId,
823        block_height: BlockHeight,
824        timestamp: Timestamp,
825        txn_tracker: &mut TransactionTracker,
826    ) -> Result<ChainId, ExecutionError> {
827        let chain_index = txn_tracker.next_chain_index();
828        let chain_origin = ChainOrigin::Child {
829            parent,
830            block_height,
831            chain_index,
832        };
833        let init_chain_config = config.init_chain_config(
834            *self.epoch.get(),
835            self.committees
836                .get()
837                .keys()
838                .min()
839                .copied()
840                .unwrap_or(Epoch::ZERO),
841            self.committees
842                .get()
843                .keys()
844                .max()
845                .copied()
846                .unwrap_or(Epoch::ZERO),
847        );
848        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
849        let child_id = chain_description.id();
850        self.debit(&AccountOwner::CHAIN, config.balance).await?;
851        let blob = Blob::new_chain_description(&chain_description);
852        txn_tracker.add_created_blob(blob);
853        Ok(child_id)
854    }
855
856    pub fn close_chain(&mut self) {
857        self.closed.set(true);
858    }
859
860    pub async fn create_application(
861        &mut self,
862        chain_id: ChainId,
863        block_height: BlockHeight,
864        module_id: ModuleId,
865        parameters: Vec<u8>,
866        required_application_ids: Vec<ApplicationId>,
867        txn_tracker: &mut TransactionTracker,
868    ) -> Result<CreateApplicationResult, ExecutionError> {
869        let application_index = txn_tracker.next_application_index();
870
871        let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
872        // We only remember to register the blobs that aren't recorded in `used_blobs`
873        // already.
874        for blob_id in blob_ids {
875            self.blob_used(txn_tracker, blob_id).await?;
876        }
877
878        let application_description = ApplicationDescription {
879            module_id,
880            creator_chain_id: chain_id,
881            block_height,
882            application_index,
883            parameters,
884            required_application_ids,
885        };
886        self.check_required_applications(&application_description, txn_tracker)
887            .await?;
888
889        let blob = Blob::new_application_description(&application_description);
890        self.used_blobs.insert(&blob.id())?;
891        txn_tracker.add_created_blob(blob);
892
893        Ok(CreateApplicationResult {
894            app_id: ApplicationId::from(&application_description),
895        })
896    }
897
898    async fn check_required_applications(
899        &mut self,
900        application_description: &ApplicationDescription,
901        txn_tracker: &mut TransactionTracker,
902    ) -> Result<(), ExecutionError> {
903        // Make sure that referenced applications IDs have been registered.
904        for required_id in &application_description.required_application_ids {
905            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
906        }
907        Ok(())
908    }
909
910    /// Retrieves an application's description.
911    pub async fn describe_application(
912        &mut self,
913        id: ApplicationId,
914        txn_tracker: &mut TransactionTracker,
915    ) -> Result<ApplicationDescription, ExecutionError> {
916        let blob_id = id.description_blob_id();
917        let content = match txn_tracker.created_blobs().get(&blob_id) {
918            Some(content) => content.clone(),
919            None => self.read_blob_content(blob_id).await?,
920        };
921        self.blob_used(txn_tracker, blob_id).await?;
922        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
923
924        let blob_ids = self
925            .check_bytecode_blobs(&description.module_id, txn_tracker)
926            .await?;
927        // We only remember to register the blobs that aren't recorded in `used_blobs`
928        // already.
929        for blob_id in blob_ids {
930            self.blob_used(txn_tracker, blob_id).await?;
931        }
932
933        self.check_required_applications(&description, txn_tracker)
934            .await?;
935
936        Ok(description)
937    }
938
939    /// Retrieves the recursive dependencies of applications and applies a topological sort.
940    pub async fn find_dependencies(
941        &mut self,
942        mut stack: Vec<ApplicationId>,
943        txn_tracker: &mut TransactionTracker,
944    ) -> Result<Vec<ApplicationId>, ExecutionError> {
945        // What we return at the end.
946        let mut result = Vec::new();
947        // The entries already inserted in `result`.
948        let mut sorted = HashSet::new();
949        // The entries for which dependencies have already been pushed once to the stack.
950        let mut seen = HashSet::new();
951
952        while let Some(id) = stack.pop() {
953            if sorted.contains(&id) {
954                continue;
955            }
956            if seen.contains(&id) {
957                // Second time we see this entry. It was last pushed just before its
958                // dependencies -- which are now fully sorted.
959                sorted.insert(id);
960                result.push(id);
961                continue;
962            }
963            // First time we see this entry:
964            // 1. Mark it so that its dependencies are no longer pushed to the stack.
965            seen.insert(id);
966            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
967            stack.push(id);
968            let app = self.describe_application(id, txn_tracker).await?;
969            for child in app.required_application_ids.iter().rev() {
970                if !seen.contains(child) {
971                    stack.push(*child);
972                }
973            }
974        }
975        Ok(result)
976    }
977
978    /// Records a blob that is used in this block. If this is the first use on this chain, creates
979    /// an oracle response for it.
980    pub(crate) async fn blob_used(
981        &mut self,
982        txn_tracker: &mut TransactionTracker,
983        blob_id: BlobId,
984    ) -> Result<bool, ExecutionError> {
985        if self.used_blobs.contains(&blob_id).await? {
986            return Ok(false); // Nothing to do.
987        }
988        self.used_blobs.insert(&blob_id)?;
989        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
990        Ok(true)
991    }
992
993    /// Records a blob that is published in this block. This does not create an oracle entry, and
994    /// the blob can be used without using an oracle in the future on this chain.
995    fn blob_published(
996        &mut self,
997        blob_id: &BlobId,
998        txn_tracker: &mut TransactionTracker,
999    ) -> Result<(), ExecutionError> {
1000        self.used_blobs.insert(blob_id)?;
1001        txn_tracker.add_published_blob(*blob_id);
1002        Ok(())
1003    }
1004
1005    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1006        match self.context().extra().get_blob(blob_id).await {
1007            Ok(Some(blob)) => Ok(blob.into()),
1008            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1009            Err(error) => Err(error.into()),
1010        }
1011    }
1012
1013    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1014        if self.context().extra().contains_blob(blob_id).await? {
1015            Ok(())
1016        } else {
1017            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1018        }
1019    }
1020
1021    async fn check_bytecode_blobs(
1022        &mut self,
1023        module_id: &ModuleId,
1024        txn_tracker: &TransactionTracker,
1025    ) -> Result<Vec<BlobId>, ExecutionError> {
1026        let blob_ids = module_id.bytecode_blob_ids();
1027
1028        let mut missing_blobs = Vec::new();
1029        for blob_id in &blob_ids {
1030            // First check if blob is present in created_blobs
1031            if txn_tracker.created_blobs().contains_key(blob_id) {
1032                continue; // Blob found in created_blobs, it's ok
1033            }
1034            // If not in created_blobs, check storage
1035            if !self.context().extra().contains_blob(*blob_id).await? {
1036                missing_blobs.push(*blob_id);
1037            }
1038        }
1039        ensure!(
1040            missing_blobs.is_empty(),
1041            ExecutionError::BlobsNotFound(missing_blobs)
1042        );
1043
1044        Ok(blob_ids)
1045    }
1046}