Skip to main content

linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10    collections::{BTreeMap, BTreeSet},
11    sync::Arc,
12};
13
14use allocative::Allocative;
15use custom_debug_derive::Debug;
16use linera_base::{
17    crypto::CryptoHash,
18    data_types::{
19        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
20        ChainDescription, ChainOrigin, Cursor, Epoch, InitialChainConfig, OracleResponse,
21        Timestamp,
22    },
23    ensure, hex_debug,
24    identifiers::{
25        Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, OwnerSpender, StreamId,
26    },
27    ownership::{ChainOwnership, TimeoutConfig},
28};
29use linera_views::{
30    context::Context,
31    lazy_register_view::LazyRegisterView,
32    map_view::MapView,
33    register_view::RegisterView,
34    set_view::SetView,
35    views::{ClonableView, ReplaceContext, View},
36    ViewError,
37};
38use serde::{Deserialize, Serialize};
39
40#[cfg(test)]
41use crate::test_utils::SystemExecutionState;
42use crate::{
43    committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
44    ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
45    OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
46};
47
48/// The event stream name for new epochs and committees.
49pub static EPOCH_STREAM_NAME: &[u8] = &[0];
50/// The event stream name for removed epochs.
51pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
52
53/// The data stored in an epoch creation event.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct EpochEventData {
56    /// The hash of the committee blob for this epoch.
57    pub blob_hash: CryptoHash,
58    /// The timestamp when the epoch was created on the admin chain.
59    pub timestamp: Timestamp,
60}
61
62/// The number of times the [`SystemOperation::OpenChain`] was executed.
63#[cfg(with_metrics)]
64mod metrics {
65    use std::sync::LazyLock;
66
67    use linera_base::prometheus_util::register_int_counter_vec;
68    use prometheus::IntCounterVec;
69
70    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
71        register_int_counter_vec(
72            "open_chain_count",
73            "The number of times the `OpenChain` operation was executed",
74            &[],
75        )
76    });
77}
78
79/// A view accessing the execution state of the system of a chain.
80#[derive(Debug, ClonableView, View, Allocative)]
81#[allocative(bound = "C")]
82pub struct SystemExecutionStateView<C> {
83    /// How the chain was created. May be unknown for inactive chains.
84    pub description: LazyRegisterView<C, Option<ChainDescription>>,
85    /// The number identifying the current configuration.
86    pub epoch: RegisterView<C, Epoch>,
87    /// The admin of the chain.
88    pub admin_chain_id: RegisterView<C, Option<ChainId>>,
89    /// The blob hash of the committee that is allowed to sign the next block on this chain.
90    /// `None` until the chain is initialized.
91    pub committee_hash: RegisterView<C, Option<CryptoHash>>,
92    /// Ownership of the chain.
93    pub ownership: LazyRegisterView<C, ChainOwnership>,
94    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
95    pub balance: RegisterView<C, Amount>,
96    /// Balances attributed to a given owner.
97    pub balances: MapView<C, AccountOwner, Amount>,
98    /// Allowances for spending from one account by another.
99    pub allowances: MapView<C, OwnerSpender, Amount>,
100    /// The timestamp of the most recent block.
101    pub timestamp: RegisterView<C, Timestamp>,
102    /// Whether this chain has been closed.
103    pub closed: RegisterView<C, bool>,
104    /// Permissions for applications on this chain.
105    pub application_permissions: LazyRegisterView<C, ApplicationPermissions>,
106    /// Blobs that have been used or published on this chain.
107    pub used_blobs: SetView<C, BlobId>,
108    /// The event stream subscriptions of applications on this chain.
109    pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
110    /// The number of events in the streams that this chain is writing to.
111    pub stream_event_counts: MapView<C, StreamId, u32>,
112    /// For each chain that previously received messages from this one and has since
113    /// notified us via [`SystemMessage::CheckpointAck`], records the cursor past the last
114    /// message we sent that the recipient finalized, together with the hash of the
115    /// recipient's block carrying the notification.
116    pub finalized_sent_messages: MapView<C, ChainId, (Cursor, CryptoHash)>,
117    /// For each recipient chain, the cursors `(block_height, transaction_index)` of
118    /// our outgoing bundles that haven't yet been acknowledged via
119    /// [`SystemMessage::CheckpointAck`]. Maintained on-chain (as opposed to the local
120    /// off-chain outbox in chain state) so it is identical across validators and can
121    /// feed the checkpoint oracle response's `outbox_block_hashes` (the unique heights
122    /// across all cursors). We store cursors rather than heights so that an ack at a
123    /// finer-grained cursor than the last bundle in a block can fully evict the entry
124    /// — important for high-fanout chains whose recipients only interact once.
125    ///
126    /// Excludes bundles whose only messages to a given recipient were
127    /// `SystemMessage::CheckpointAck`: those don't trigger a return notification
128    /// from the recipient, so tracking them would accumulate forever.
129    pub unfinalized_message_blocks: MapView<C, ChainId, BTreeSet<Cursor>>,
130    /// Chains from which we've received at least one non-`CheckpointAck` message
131    /// since our last `SystemOperation::Checkpoint`. Determines whom to notify with a
132    /// `SystemMessage::CheckpointAck` at the next checkpoint operation. Excluding
133    /// `CheckpointAck` messages here is what breaks the otherwise-perpetual
134    /// notification ping-pong between two chains that ever exchanged a real message.
135    pub pending_checkpoint_ack_targets: SetView<C, ChainId>,
136}
137
138impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
139    type Target = SystemExecutionStateView<C2>;
140
141    async fn with_context(
142        &mut self,
143        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
144    ) -> Self::Target {
145        SystemExecutionStateView {
146            description: self.description.with_context(ctx.clone()).await,
147            epoch: self.epoch.with_context(ctx.clone()).await,
148            admin_chain_id: self.admin_chain_id.with_context(ctx.clone()).await,
149            committee_hash: self.committee_hash.with_context(ctx.clone()).await,
150            ownership: self.ownership.with_context(ctx.clone()).await,
151            balance: self.balance.with_context(ctx.clone()).await,
152            balances: self.balances.with_context(ctx.clone()).await,
153            allowances: self.allowances.with_context(ctx.clone()).await,
154            timestamp: self.timestamp.with_context(ctx.clone()).await,
155            closed: self.closed.with_context(ctx.clone()).await,
156            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
157            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
158            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
159            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
160            finalized_sent_messages: self.finalized_sent_messages.with_context(ctx.clone()).await,
161            unfinalized_message_blocks: self
162                .unfinalized_message_blocks
163                .with_context(ctx.clone())
164                .await,
165            pending_checkpoint_ack_targets: self
166                .pending_checkpoint_ack_targets
167                .with_context(ctx.clone())
168                .await,
169        }
170    }
171}
172
173/// The applications subscribing to a particular stream, and their per-application event indices.
174#[derive(Debug, Clone, Serialize, Deserialize, Allocative)]
175pub struct EventSubscriptions {
176    /// Cached minimum of all per-application `next_index` values. Used for short-circuit
177    /// filtering: if the next available event index is <= this value, no application needs
178    /// processing. Set to `u32::MAX` when no applications are subscribed.
179    pub min_next_index: u32,
180    /// The applications that are subscribed to this stream, each mapped to the next event
181    /// index that they need to process.
182    pub applications: BTreeMap<ApplicationId, u32>,
183}
184
185impl Default for EventSubscriptions {
186    fn default() -> Self {
187        Self {
188            min_next_index: u32::MAX,
189            applications: BTreeMap::new(),
190        }
191    }
192}
193
194impl EventSubscriptions {
195    pub(crate) fn recalculate_min(&mut self) {
196        self.min_next_index = self
197            .applications
198            .values()
199            .copied()
200            .min()
201            .unwrap_or(u32::MAX);
202    }
203}
204
205/// The initial configuration for a new chain.
206#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
207pub struct OpenChainConfig {
208    /// The ownership configuration of the new chain.
209    pub ownership: ChainOwnership,
210    /// The initial chain balance.
211    pub balance: Amount,
212    /// The initial application permissions.
213    pub application_permissions: ApplicationPermissions,
214}
215
216impl OpenChainConfig {
217    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
218    /// parameters.
219    pub fn init_chain_config(&self, epoch: Epoch) -> InitialChainConfig {
220        InitialChainConfig {
221            application_permissions: self.application_permissions.clone(),
222            balance: self.balance,
223            epoch,
224            ownership: self.ownership.clone(),
225        }
226    }
227}
228
229/// A system operation.
230#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
231pub enum SystemOperation {
232    /// Transfers `amount` units of value from the given owner's account to the recipient.
233    /// If no owner is given, try to take the units out of the unattributed account.
234    Transfer {
235        owner: AccountOwner,
236        recipient: Account,
237        amount: Amount,
238    },
239    /// Claims `amount` units of value from the given owner's account in the remote
240    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
241    /// process the message.
242    Claim {
243        owner: AccountOwner,
244        target_id: ChainId,
245        recipient: Account,
246        amount: Amount,
247    },
248    /// Creates (or activates) a new chain.
249    /// This will automatically subscribe to the future committees created by `admin_chain_id`.
250    OpenChain(OpenChainConfig),
251    /// Closes the chain.
252    CloseChain,
253    /// Changes the ownership of the chain.
254    ChangeOwnership {
255        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
256        #[debug(skip_if = Vec::is_empty)]
257        super_owners: Vec<AccountOwner>,
258        /// The regular owners, with their weights that determine how often they are round leader.
259        #[debug(skip_if = Vec::is_empty)]
260        owners: Vec<(AccountOwner, u64)>,
261        /// The leader of the first single-leader round. If not set, this is random like other rounds.
262        #[debug(skip_if = Option::is_none)]
263        first_leader: Option<AccountOwner>,
264        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
265        multi_leader_rounds: u32,
266        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
267        /// This should only be `true` on chains with restrictive application permissions and an
268        /// application-based mechanism to select block proposers.
269        open_multi_leader_rounds: bool,
270        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
271        timeout_config: TimeoutConfig,
272    },
273    /// Changes the application permissions configuration on this chain.
274    ChangeApplicationPermissions(ApplicationPermissions),
275    /// Publishes a new application module.
276    PublishModule { module_id: ModuleId },
277    /// Publishes a new data blob.
278    PublishDataBlob { blob_hash: CryptoHash },
279    /// Verifies that the given blob exists. Otherwise the block fails.
280    VerifyBlob { blob_id: BlobId },
281    /// Creates a new application.
282    CreateApplication {
283        module_id: ModuleId,
284        #[serde(with = "serde_bytes")]
285        #[debug(with = "hex_debug")]
286        parameters: Vec<u8>,
287        #[serde(with = "serde_bytes")]
288        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
289        instantiation_argument: Vec<u8>,
290        #[debug(skip_if = Vec::is_empty)]
291        required_application_ids: Vec<ApplicationId>,
292    },
293    /// Operations that are only allowed on the admin chain.
294    Admin(AdminOperation),
295    /// Processes an event about a new epoch and committee.
296    ProcessNewEpoch(Epoch),
297    /// Updates the event stream trackers.
298    UpdateStream {
299        application_id: ApplicationId,
300        chain_id: ChainId,
301        stream_id: StreamId,
302        next_index: u32,
303    },
304    /// Publishes a canonical snapshot of the chain's execution state as a blob,
305    /// resetting the execution-state hash to the hash of that content. This allows
306    /// future nodes to bootstrap from the snapshot instead of replaying the chain's
307    /// history. Subject to a strict set of preconditions on the chain's state.
308    Checkpoint,
309}
310
311/// Operations that are only allowed on the admin chain.
312#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
313pub enum AdminOperation {
314    /// Publishes a new committee as a blob. This can be assigned to an epoch using
315    /// [`AdminOperation::CreateCommittee`] in a later block.
316    PublishCommitteeBlob { blob_hash: CryptoHash },
317    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
318    /// [`SystemOperation::ProcessNewEpoch`].
319    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
320    /// Removes a committee. Blocks signed by this committee will only be accepted once they
321    /// have been followed (hence re-certified) by a block certified by a recent committee.
322    RemoveCommittee { epoch: Epoch },
323}
324
325/// A system message meant to be executed on a remote chain.
326#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
327pub enum SystemMessage {
328    /// Credits `amount` units of value to the account `target` -- unless the message is
329    /// bouncing, in which case `source` is credited instead.
330    Credit {
331        target: AccountOwner,
332        amount: Amount,
333        source: AccountOwner,
334    },
335    /// Withdraws `amount` units of value from the account and starts a transfer to credit
336    /// the recipient. The message must be properly authenticated. Receiver chains may
337    /// refuse it depending on their configuration.
338    Withdraw {
339        owner: AccountOwner,
340        amount: Amount,
341        recipient: Account,
342    },
343    /// Sent by a chain that just executed `SystemOperation::Checkpoint` to each chain
344    /// it has received at least one non-`CheckpointAck` message from since its
345    /// previous checkpoint. `latest_received_cursor` is the position past the last
346    /// bundle from the recipient that the sender has consumed. The recipient records
347    /// this in `finalized_sent_messages` so that, when it later checkpoints its own
348    /// state, it can drop already-delivered outgoing messages from its outbox dump.
349    CheckpointAck { latest_received_cursor: Cursor },
350}
351
352/// A query to the system state.
353#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
354pub struct SystemQuery;
355
356/// The response to a system query.
357#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
358pub struct SystemResponse {
359    pub chain_id: ChainId,
360    pub balance: Amount,
361}
362
363/// Optional user message attached to a transfer.
364#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
365pub struct UserData(pub Option<[u8; 32]>);
366
367#[derive(Debug)]
368pub struct CreateApplicationResult {
369    pub app_id: ApplicationId,
370}
371
372impl<C> SystemExecutionStateView<C>
373where
374    C: Context + Clone + 'static,
375    C::Extra: ExecutionRuntimeContext,
376{
377    /// Invariant for the states of active chains.
378    pub async fn is_active(&self) -> Result<bool, ViewError> {
379        Ok(self.description.get().await?.is_some()
380            && self.ownership.get().await?.is_active()
381            && self.admin_chain_id.get().is_some())
382    }
383
384    /// Returns the current committee, if the chain has been initialized.
385    pub async fn current_committee(
386        &self,
387    ) -> Result<Option<(Epoch, Arc<Committee>)>, ExecutionError> {
388        let Some(hash) = *self.committee_hash.get() else {
389            return Ok(None);
390        };
391        let epoch = *self.epoch.get();
392        let committee = self
393            .context()
394            .extra()
395            .get_or_load_committee_by_hash(hash)
396            .await?;
397        Ok(Some((epoch, committee)))
398    }
399
400    async fn get_event(&self, event_id: EventId) -> Result<Arc<Vec<u8>>, ExecutionError> {
401        match self.context().extra().get_event(event_id.clone()).await? {
402            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
403            Some(vec) => Ok(vec),
404        }
405    }
406
407    /// Executes the sender's side of an operation and returns a list of actions to be
408    /// taken.
409    pub async fn execute_operation(
410        &mut self,
411        context: OperationContext,
412        operation: SystemOperation,
413        txn_tracker: &mut TransactionTracker,
414        resource_controller: &mut ResourceController<Option<AccountOwner>>,
415    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
416        use SystemOperation::*;
417        let mut new_application = None;
418        match operation {
419            OpenChain(config) => {
420                let _chain_id = self
421                    .open_chain(
422                        config,
423                        context.chain_id,
424                        context.height,
425                        context.timestamp,
426                        txn_tracker,
427                    )
428                    .await?;
429                #[cfg(with_metrics)]
430                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
431            }
432            ChangeOwnership {
433                super_owners,
434                owners,
435                first_leader,
436                multi_leader_rounds,
437                open_multi_leader_rounds,
438                timeout_config,
439            } => {
440                self.ownership.set(ChainOwnership {
441                    super_owners: super_owners.into_iter().collect(),
442                    owners: owners.into_iter().collect(),
443                    first_leader,
444                    multi_leader_rounds,
445                    open_multi_leader_rounds,
446                    timeout_config,
447                });
448            }
449            ChangeApplicationPermissions(application_permissions) => {
450                self.application_permissions.set(application_permissions);
451            }
452            CloseChain => self.close_chain(),
453            Transfer {
454                owner,
455                amount,
456                recipient,
457            } => {
458                let maybe_message = self
459                    .transfer(context.authenticated_owner, None, owner, recipient, amount)
460                    .await?;
461                txn_tracker.add_outgoing_messages(maybe_message);
462            }
463            Claim {
464                owner,
465                target_id,
466                recipient,
467                amount,
468            } => {
469                let maybe_message = self
470                    .claim(
471                        context.authenticated_owner,
472                        None,
473                        owner,
474                        target_id,
475                        recipient,
476                        amount,
477                    )
478                    .await?;
479                txn_tracker.add_outgoing_messages(maybe_message);
480            }
481            Admin(admin_operation) => {
482                ensure!(
483                    *self.admin_chain_id.get() == Some(context.chain_id),
484                    ExecutionError::AdminOperationOnNonAdminChain
485                );
486                match admin_operation {
487                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
488                        self.blob_published(
489                            &BlobId::new(blob_hash, BlobType::Committee),
490                            txn_tracker,
491                        )?;
492                    }
493                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
494                        self.check_next_epoch(epoch)?;
495                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
496                        // Validate that the blob exists and deserializes as a Committee.
497                        self.context()
498                            .extra()
499                            .get_or_load_committee_by_hash(blob_hash)
500                            .await?;
501                        self.blob_used(txn_tracker, blob_id).await?;
502                        self.committee_hash.set(Some(blob_hash));
503                        self.epoch.set(epoch);
504                        let event_data = EpochEventData {
505                            blob_hash,
506                            timestamp: context.timestamp,
507                        };
508                        let stream_id = StreamId::system(EPOCH_STREAM_NAME);
509                        let next_index = epoch.0.checked_add(1).ok_or(ArithmeticError::Overflow)?;
510                        self.stream_event_counts.insert(&stream_id, next_index)?;
511                        txn_tracker.add_event(stream_id, epoch.0, bcs::to_bytes(&event_data)?);
512                    }
513                    AdminOperation::RemoveCommittee { epoch } => {
514                        let stream_id = StreamId::system(REMOVED_EPOCH_STREAM_NAME);
515                        let count = self.stream_event_counts.get(&stream_id).await?.unwrap_or(0);
516                        // Revocations must happen in increasing epoch order, so the stream's
517                        // indices stay sequential.
518                        ensure!(
519                            count == epoch.0 && epoch < *self.epoch.get(),
520                            ExecutionError::InvalidCommitteeRemoval
521                        );
522                        let next_index = epoch.0.checked_add(1).ok_or(ArithmeticError::Overflow)?;
523                        self.stream_event_counts.insert(&stream_id, next_index)?;
524                        txn_tracker.add_event(stream_id, epoch.0, vec![]);
525                    }
526                }
527            }
528            PublishModule { module_id } => {
529                for blob_id in module_id.bytecode_blob_ids() {
530                    self.blob_published(&blob_id, txn_tracker)?;
531                }
532            }
533            CreateApplication {
534                module_id,
535                parameters,
536                instantiation_argument,
537                required_application_ids,
538            } => {
539                let CreateApplicationResult { app_id } = self
540                    .create_application(
541                        context.chain_id,
542                        context.height,
543                        module_id,
544                        parameters,
545                        required_application_ids,
546                        txn_tracker,
547                    )
548                    .await?;
549                new_application = Some((app_id, instantiation_argument));
550            }
551            PublishDataBlob { blob_hash } => {
552                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
553            }
554            VerifyBlob { blob_id } => {
555                self.assert_blob_exists(blob_id).await?;
556                resource_controller
557                    .with_state(self)
558                    .await?
559                    .track_blob_read(0)?;
560                self.blob_used(txn_tracker, blob_id).await?;
561            }
562            ProcessNewEpoch(epoch) => {
563                self.check_next_epoch(epoch)?;
564                let admin_chain_id = self.admin_chain_id.get().ok_or_else(|| {
565                    ExecutionError::InternalError(
566                        "execute_operation called for uninitialized chain",
567                    )
568                })?;
569                let event_id = EventId {
570                    chain_id: admin_chain_id,
571                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
572                    index: epoch.0,
573                };
574                let bytes = txn_tracker
575                    .oracle(|| async {
576                        let bytes = self.get_event(event_id.clone()).await?;
577                        Ok(OracleResponse::Event(
578                            event_id.clone(),
579                            Arc::unwrap_or_clone(bytes),
580                        ))
581                    })
582                    .await?
583                    .to_event(&event_id)?;
584                let event_data: EpochEventData = bcs::from_bytes(&bytes)?;
585                let blob_id = BlobId::new(event_data.blob_hash, BlobType::Committee);
586                // Validate that the blob exists and deserializes as a Committee.
587                self.context()
588                    .extra()
589                    .get_or_load_committee_by_hash(event_data.blob_hash)
590                    .await?;
591                self.blob_used(txn_tracker, blob_id).await?;
592                self.committee_hash.set(Some(event_data.blob_hash));
593                self.epoch.set(epoch);
594            }
595            UpdateStream {
596                application_id,
597                chain_id,
598                stream_id,
599                next_index,
600            } => {
601                let subscriptions = self
602                    .event_subscriptions
603                    .get_mut_or_default(&(chain_id, stream_id.clone()))
604                    .await?;
605                let app_next_index = *subscriptions
606                    .applications
607                    .get(&application_id)
608                    .ok_or(ExecutionError::UnsubscribedUpdateStream)?;
609                ensure!(
610                    app_next_index < next_index,
611                    ExecutionError::OutdatedUpdateStream
612                );
613                txn_tracker.add_stream_to_process(
614                    application_id,
615                    chain_id,
616                    stream_id.clone(),
617                    app_next_index,
618                    next_index,
619                );
620                subscriptions
621                    .applications
622                    .insert(application_id, next_index);
623                subscriptions.recalculate_min();
624                let index = next_index
625                    .checked_sub(1)
626                    .ok_or(ArithmeticError::Underflow)?;
627                let event_id = EventId {
628                    chain_id,
629                    stream_id,
630                    index,
631                };
632                let context = self.context();
633                let extra = context.extra();
634                let mut missing_events = Vec::new();
635                txn_tracker
636                    .oracle(|| async {
637                        if !extra.contains_event(event_id.clone()).await? {
638                            missing_events.push(event_id.clone());
639                        }
640                        Ok(OracleResponse::EventExists(event_id))
641                    })
642                    .await?;
643                ensure!(
644                    missing_events.is_empty(),
645                    ExecutionError::EventsNotFound(missing_events)
646                );
647            }
648            Checkpoint => {
649                return Err(ExecutionError::InternalError(
650                    "SystemOperation::Checkpoint must be dispatched at ExecutionStateView level",
651                ));
652            }
653        }
654
655        Ok(new_application)
656    }
657
658    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
659    /// epoch.
660    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
661        let expected = self.epoch.get().try_add_one()?;
662        ensure!(
663            provided == expected,
664            ExecutionError::InvalidCommitteeEpoch { provided, expected }
665        );
666        Ok(())
667    }
668
669    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
670        if owner == &AccountOwner::CHAIN {
671            let new_balance = self.balance.get().saturating_add(amount);
672            self.balance.set(new_balance);
673        } else {
674            let balance = self.balances.get_mut_or_default(owner).await?;
675            *balance = balance.saturating_add(amount);
676        }
677        Ok(())
678    }
679
680    async fn credit_or_send_message(
681        &mut self,
682        source: AccountOwner,
683        recipient: Account,
684        amount: Amount,
685    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
686        let source_chain_id = self.context().extra().chain_id();
687        if recipient.chain_id == source_chain_id {
688            // Handle same-chain transfer locally.
689            let target = recipient.owner;
690            self.credit(&target, amount).await?;
691            Ok(None)
692        } else {
693            // Handle cross-chain transfer with message.
694            let message = SystemMessage::Credit {
695                amount,
696                source,
697                target: recipient.owner,
698            };
699            Ok(Some(
700                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
701            ))
702        }
703    }
704
705    pub async fn transfer(
706        &mut self,
707        authenticated_owner: Option<AccountOwner>,
708        authenticated_application_id: Option<ApplicationId>,
709        source: AccountOwner,
710        recipient: Account,
711        amount: Amount,
712    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
713        if source == AccountOwner::CHAIN {
714            let authenticated_owner =
715                authenticated_owner.ok_or(ExecutionError::UnauthenticatedTransferOwner)?;
716            ensure!(
717                self.ownership.get().await?.is_owner(&authenticated_owner),
718                ExecutionError::UnauthenticatedTransferOwner
719            );
720        } else {
721            ensure!(
722                authenticated_owner == Some(source)
723                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
724                ExecutionError::UnauthenticatedTransferOwner
725            );
726        }
727        ensure!(
728            amount > Amount::ZERO,
729            ExecutionError::IncorrectTransferAmount
730        );
731        self.debit(&source, amount).await?;
732        self.credit_or_send_message(source, recipient, amount).await
733    }
734
735    pub async fn claim(
736        &mut self,
737        authenticated_owner: Option<AccountOwner>,
738        authenticated_application_id: Option<ApplicationId>,
739        source: AccountOwner,
740        target_id: ChainId,
741        recipient: Account,
742        amount: Amount,
743    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
744        ensure!(
745            authenticated_owner == Some(source)
746                || authenticated_application_id.map(AccountOwner::from) == Some(source),
747            ExecutionError::UnauthenticatedClaimOwner
748        );
749        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
750
751        let current_chain_id = self.context().extra().chain_id();
752        if target_id == current_chain_id {
753            // Handle same-chain claim locally by processing the withdraw operation directly
754            self.debit(&source, amount).await?;
755            self.credit_or_send_message(source, recipient, amount).await
756        } else {
757            // Handle cross-chain claim with Withdraw message
758            let message = SystemMessage::Withdraw {
759                amount,
760                owner: source,
761                recipient,
762            };
763            Ok(Some(
764                OutgoingMessage::new(target_id, message)
765                    .with_authenticated_owner(authenticated_owner),
766            ))
767        }
768    }
769
770    pub async fn approve(
771        &mut self,
772        authenticated_owner: Option<AccountOwner>,
773        authenticated_application_id: Option<ApplicationId>,
774        owner: AccountOwner,
775        spender: AccountOwner,
776        amount: Amount,
777    ) -> Result<(), ExecutionError> {
778        ensure!(
779            authenticated_owner == Some(owner)
780                || authenticated_application_id.map(AccountOwner::from) == Some(owner),
781            ExecutionError::UnauthenticatedTransferOwner
782        );
783
784        let owner_spender = OwnerSpender::new(owner, spender);
785        if amount == Amount::ZERO {
786            self.allowances.remove(&owner_spender)?;
787            return Ok(());
788        }
789        let allowance = self.allowances.get_mut_or_default(&owner_spender).await?;
790        *allowance = amount;
791
792        Ok(())
793    }
794
795    pub async fn transfer_from(
796        &mut self,
797        authenticated_owner: Option<AccountOwner>,
798        authenticated_application_id: Option<ApplicationId>,
799        owner: AccountOwner,
800        spender: AccountOwner,
801        recipient: Account,
802        amount: Amount,
803    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
804        ensure!(
805            authenticated_owner == Some(spender)
806                || authenticated_application_id.map(AccountOwner::from) == Some(spender),
807            ExecutionError::UnauthenticatedTransferOwner
808        );
809        ensure!(
810            amount > Amount::ZERO,
811            ExecutionError::IncorrectTransferAmount
812        );
813
814        // Debit from allowance
815        let owner_spender = OwnerSpender::new(owner, spender);
816        let allowance = self.allowances.get_mut_or_default(&owner_spender).await?;
817
818        allowance
819            .try_sub_assign(amount)
820            .map_err(|_| ExecutionError::InsufficientAllowance {
821                allowance: *allowance,
822                owner,
823                spender,
824            })?;
825
826        if allowance.is_zero() {
827            self.allowances.remove(&owner_spender)?;
828        }
829
830        // Debit from owner's balance
831        self.debit(&owner, amount).await?;
832
833        // Credit or send message
834        self.credit_or_send_message(owner, recipient, amount).await
835    }
836
837    /// Debits an [`Amount`] of tokens from an account's balance.
838    async fn debit(
839        &mut self,
840        account: &AccountOwner,
841        amount: Amount,
842    ) -> Result<(), ExecutionError> {
843        let balance = if account == &AccountOwner::CHAIN {
844            self.balance.get_mut()
845        } else {
846            self.balances.get_mut(account).await?.ok_or_else(|| {
847                ExecutionError::InsufficientBalance {
848                    balance: Amount::ZERO,
849                    account: *account,
850                }
851            })?
852        };
853
854        balance
855            .try_sub_assign(amount)
856            .map_err(|_| ExecutionError::InsufficientBalance {
857                balance: *balance,
858                account: *account,
859            })?;
860
861        if account != &AccountOwner::CHAIN && balance.is_zero() {
862            self.balances.remove(account)?;
863        }
864
865        Ok(())
866    }
867
868    /// Executes a cross-chain message that represents the recipient's side of an operation.
869    pub async fn execute_message(
870        &mut self,
871        context: MessageContext,
872        message: SystemMessage,
873    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
874        let mut outcome = Vec::new();
875        use SystemMessage::*;
876        match message {
877            Credit {
878                amount,
879                source,
880                target,
881            } => {
882                let receiver = if context.is_bouncing { source } else { target };
883                self.credit(&receiver, amount).await?;
884            }
885            Withdraw {
886                amount,
887                owner,
888                recipient,
889            } => {
890                self.debit(&owner, amount).await?;
891                if let Some(message) = self
892                    .credit_or_send_message(owner, recipient, amount)
893                    .await?
894                {
895                    outcome.push(message);
896                }
897            }
898            CheckpointAck {
899                latest_received_cursor,
900            } => {
901                self.finalized_sent_messages.insert(
902                    &context.origin,
903                    (latest_received_cursor, context.origin_certificate_hash),
904                )?;
905                // Drop every cursor the recipient has consumed. `split_off(&k)` on a
906                // `BTreeSet<Cursor>` returns the entries `>= k`, so this trims the
907                // strict prefix below `latest_received_cursor` and leaves any
908                // still-unfinalized bundles in place. A recipient that has consumed
909                // everything we ever sent ends up with an empty set and is evicted.
910                if let Some(mut cursors) =
911                    self.unfinalized_message_blocks.get(&context.origin).await?
912                {
913                    let retained = cursors.split_off(&latest_received_cursor);
914                    if retained.is_empty() {
915                        self.unfinalized_message_blocks.remove(&context.origin)?;
916                    } else {
917                        self.unfinalized_message_blocks
918                            .insert(&context.origin, retained)?;
919                    }
920                }
921            }
922        }
923        Ok(outcome)
924    }
925
926    /// Initializes the system application state on a newly opened chain.
927    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
928    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
929        if self.description.get().await?.is_some() {
930            // already initialized
931            return Ok(true);
932        }
933        let description_blob = self
934            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
935            .await?;
936        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
937        let InitialChainConfig {
938            ownership,
939            epoch,
940            balance,
941            application_permissions,
942        } = description.config().clone();
943        self.timestamp.set(description.timestamp());
944        self.description.set(Some(description));
945        self.epoch.set(epoch);
946
947        let committee_hash = *self
948            .context()
949            .extra()
950            .get_committee_hashes(epoch..=epoch)
951            .await?
952            .get(&epoch)
953            .expect("get_committee_hashes returns the requested epoch on success");
954        let admin_chain_id = self
955            .context()
956            .extra()
957            .get_network_description()
958            .await?
959            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
960            .admin_chain_id;
961
962        self.committee_hash.set(Some(committee_hash));
963        self.admin_chain_id.set(Some(admin_chain_id));
964        self.ownership.set(ownership);
965        self.balance.set(balance);
966        self.application_permissions.set(application_permissions);
967        Ok(false)
968    }
969
970    pub fn handle_query(
971        &mut self,
972        context: QueryContext,
973        _query: SystemQuery,
974    ) -> QueryOutcome<SystemResponse> {
975        let response = SystemResponse {
976            chain_id: context.chain_id,
977            balance: *self.balance.get(),
978        };
979        QueryOutcome {
980            response,
981            operations: vec![],
982        }
983    }
984
985    /// Returns the messages to open a new chain, and subtracts the new chain's balance
986    /// from this chain's.
987    pub async fn open_chain(
988        &mut self,
989        config: OpenChainConfig,
990        parent: ChainId,
991        block_height: BlockHeight,
992        timestamp: Timestamp,
993        txn_tracker: &mut TransactionTracker,
994    ) -> Result<ChainId, ExecutionError> {
995        let chain_index = txn_tracker.next_chain_index();
996        let chain_origin = ChainOrigin::Child {
997            parent,
998            block_height,
999            chain_index,
1000        };
1001        let init_chain_config = config.init_chain_config(*self.epoch.get());
1002        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
1003        let child_id = chain_description.id();
1004        self.debit(&AccountOwner::CHAIN, config.balance).await?;
1005        let blob = Blob::new_chain_description(&chain_description);
1006        txn_tracker.add_created_blob(blob);
1007        Ok(child_id)
1008    }
1009
1010    pub fn close_chain(&mut self) {
1011        self.closed.set(true);
1012    }
1013
1014    pub async fn create_application(
1015        &mut self,
1016        chain_id: ChainId,
1017        block_height: BlockHeight,
1018        module_id: ModuleId,
1019        parameters: Vec<u8>,
1020        required_application_ids: Vec<ApplicationId>,
1021        txn_tracker: &mut TransactionTracker,
1022    ) -> Result<CreateApplicationResult, ExecutionError> {
1023        let application_index = txn_tracker.next_application_index();
1024
1025        let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
1026        // We only remember to register the blobs that aren't recorded in `used_blobs`
1027        // already.
1028        for blob_id in blob_ids {
1029            self.blob_used(txn_tracker, blob_id).await?;
1030        }
1031
1032        let application_description = ApplicationDescription {
1033            module_id,
1034            creator_chain_id: chain_id,
1035            block_height,
1036            application_index,
1037            parameters,
1038            required_application_ids,
1039        };
1040        self.check_required_applications(&application_description, txn_tracker)
1041            .await?;
1042
1043        let blob = Blob::new_application_description(&application_description);
1044        self.used_blobs.insert(&blob.id())?;
1045        txn_tracker.add_created_blob(blob);
1046
1047        Ok(CreateApplicationResult {
1048            app_id: ApplicationId::from(&application_description),
1049        })
1050    }
1051
1052    async fn check_required_applications(
1053        &mut self,
1054        application_description: &ApplicationDescription,
1055        txn_tracker: &mut TransactionTracker,
1056    ) -> Result<(), ExecutionError> {
1057        // Make sure that referenced applications IDs have been registered.
1058        for required_id in &application_description.required_application_ids {
1059            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
1060        }
1061        Ok(())
1062    }
1063
1064    /// Retrieves an application's description.
1065    pub async fn describe_application(
1066        &mut self,
1067        id: ApplicationId,
1068        txn_tracker: &mut TransactionTracker,
1069    ) -> Result<ApplicationDescription, ExecutionError> {
1070        let blob_id = id.description_blob_id();
1071        let content = match txn_tracker.created_blobs().get(&blob_id) {
1072            Some(content) => content.clone(),
1073            None => self.read_blob_content(blob_id).await?,
1074        };
1075        self.blob_used(txn_tracker, blob_id).await?;
1076        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
1077
1078        let blob_ids = self
1079            .check_bytecode_blobs(&description.module_id, txn_tracker)
1080            .await?;
1081        // We only remember to register the blobs that aren't recorded in `used_blobs`
1082        // already.
1083        for blob_id in blob_ids {
1084            self.blob_used(txn_tracker, blob_id).await?;
1085        }
1086
1087        self.check_required_applications(&description, txn_tracker)
1088            .await?;
1089
1090        Ok(description)
1091    }
1092
1093    /// Records a blob that is used in this block. If this is the first use on this chain, creates
1094    /// an oracle response for it.
1095    pub(crate) async fn blob_used(
1096        &mut self,
1097        txn_tracker: &mut TransactionTracker,
1098        blob_id: BlobId,
1099    ) -> Result<bool, ExecutionError> {
1100        if self.used_blobs.contains(&blob_id).await? {
1101            return Ok(false); // Nothing to do.
1102        }
1103        self.used_blobs.insert(&blob_id)?;
1104        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
1105        Ok(true)
1106    }
1107
1108    /// Records a blob that is published in this block. This does not create an oracle entry, and
1109    /// the blob can be used without using an oracle in the future on this chain.
1110    fn blob_published(
1111        &mut self,
1112        blob_id: &BlobId,
1113        txn_tracker: &mut TransactionTracker,
1114    ) -> Result<(), ExecutionError> {
1115        self.used_blobs.insert(blob_id)?;
1116        txn_tracker.add_published_blob(*blob_id);
1117        Ok(())
1118    }
1119
1120    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1121        match self.context().extra().get_blob(blob_id).await {
1122            Ok(Some(blob)) => Ok(Arc::unwrap_or_clone(blob).into()),
1123            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1124            Err(error) => Err(error.into()),
1125        }
1126    }
1127
1128    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1129        if self.context().extra().contains_blob(blob_id).await? {
1130            Ok(())
1131        } else {
1132            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1133        }
1134    }
1135
1136    async fn check_bytecode_blobs(
1137        &self,
1138        module_id: &ModuleId,
1139        txn_tracker: &TransactionTracker,
1140    ) -> Result<Vec<BlobId>, ExecutionError> {
1141        let blob_ids = module_id.bytecode_blob_ids();
1142
1143        let mut missing_blobs = Vec::new();
1144        for blob_id in &blob_ids {
1145            // First check if blob is present in created_blobs
1146            if txn_tracker.created_blobs().contains_key(blob_id) {
1147                continue; // Blob found in created_blobs, it's ok
1148            }
1149            // If not in created_blobs, check storage
1150            if !self.context().extra().contains_blob(*blob_id).await? {
1151                missing_blobs.push(*blob_id);
1152            }
1153        }
1154        ensure!(
1155            missing_blobs.is_empty(),
1156            ExecutionError::BlobsNotFound(missing_blobs)
1157        );
1158
1159        Ok(blob_ids)
1160    }
1161}