linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10    collections::{BTreeMap, BTreeSet},
11    sync::Arc,
12};
13
14use allocative::Allocative;
15use custom_debug_derive::Debug;
16use linera_base::{
17    crypto::CryptoHash,
18    data_types::{
19        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
20        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
21    },
22    ensure, hex_debug,
23    identifiers::{
24        Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, OwnerSpender, StreamId,
25    },
26    ownership::{ChainOwnership, TimeoutConfig},
27};
28use linera_views::{
29    context::Context,
30    lazy_register_view::LazyRegisterView,
31    map_view::MapView,
32    register_view::RegisterView,
33    set_view::SetView,
34    views::{ClonableView, ReplaceContext, View},
35    ViewError,
36};
37use serde::{Deserialize, Serialize};
38
39#[cfg(test)]
40use crate::test_utils::SystemExecutionState;
41use crate::{
42    committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
43    ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
44    OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
45};
46
47/// The event stream name for new epochs and committees.
48pub static EPOCH_STREAM_NAME: &[u8] = &[0];
49/// The event stream name for removed epochs.
50pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
51
52/// The data stored in an epoch creation event.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct EpochEventData {
55    /// The hash of the committee blob for this epoch.
56    pub blob_hash: CryptoHash,
57    /// The timestamp when the epoch was created on the admin chain.
58    pub timestamp: Timestamp,
59}
60
61/// The number of times the [`SystemOperation::OpenChain`] was executed.
62#[cfg(with_metrics)]
63mod metrics {
64    use std::sync::LazyLock;
65
66    use linera_base::prometheus_util::register_int_counter_vec;
67    use prometheus::IntCounterVec;
68
69    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
70        register_int_counter_vec(
71            "open_chain_count",
72            "The number of times the `OpenChain` operation was executed",
73            &[],
74        )
75    });
76}
77
78/// A view accessing the execution state of the system of a chain.
79#[derive(Debug, ClonableView, View, Allocative)]
80#[allocative(bound = "C")]
81pub struct SystemExecutionStateView<C> {
82    /// How the chain was created. May be unknown for inactive chains.
83    pub description: LazyRegisterView<C, Option<ChainDescription>>,
84    /// The number identifying the current configuration.
85    pub epoch: RegisterView<C, Epoch>,
86    /// The admin of the chain.
87    pub admin_chain_id: RegisterView<C, Option<ChainId>>,
88    /// The blob hashes of the committees we trust, indexed by epoch number.
89    // Not using a `MapView` because the set of active committees is supposed to be
90    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
91    // (e.g. the `OpenChain` operation). The actual committees live in the blob store
92    // and are cached process-wide via `SharedCommittees`.
93    pub committees: RegisterView<C, BTreeMap<Epoch, CryptoHash>>,
94    /// Ownership of the chain.
95    pub ownership: LazyRegisterView<C, ChainOwnership>,
96    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
97    pub balance: RegisterView<C, Amount>,
98    /// Balances attributed to a given owner.
99    pub balances: MapView<C, AccountOwner, Amount>,
100    /// Allowances for spending from one account by another.
101    pub allowances: MapView<C, OwnerSpender, Amount>,
102    /// The timestamp of the most recent block.
103    pub timestamp: RegisterView<C, Timestamp>,
104    /// Whether this chain has been closed.
105    pub closed: RegisterView<C, bool>,
106    /// Permissions for applications on this chain.
107    pub application_permissions: LazyRegisterView<C, ApplicationPermissions>,
108    /// Blobs that have been used or published on this chain.
109    pub used_blobs: SetView<C, BlobId>,
110    /// The event stream subscriptions of applications on this chain.
111    pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
112    /// The number of events in the streams that this chain is writing to.
113    pub stream_event_counts: MapView<C, StreamId, u32>,
114}
115
116impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
117    type Target = SystemExecutionStateView<C2>;
118
119    async fn with_context(
120        &mut self,
121        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
122    ) -> Self::Target {
123        SystemExecutionStateView {
124            description: self.description.with_context(ctx.clone()).await,
125            epoch: self.epoch.with_context(ctx.clone()).await,
126            admin_chain_id: self.admin_chain_id.with_context(ctx.clone()).await,
127            committees: self.committees.with_context(ctx.clone()).await,
128            ownership: self.ownership.with_context(ctx.clone()).await,
129            balance: self.balance.with_context(ctx.clone()).await,
130            balances: self.balances.with_context(ctx.clone()).await,
131            allowances: self.allowances.with_context(ctx.clone()).await,
132            timestamp: self.timestamp.with_context(ctx.clone()).await,
133            closed: self.closed.with_context(ctx.clone()).await,
134            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
135            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
136            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
137            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
138        }
139    }
140}
141
142/// The applications subscribing to a particular stream, and the next event index.
143#[derive(Debug, Default, Clone, Serialize, Deserialize, Allocative)]
144pub struct EventSubscriptions {
145    /// The next event index, i.e. the total number of events in this stream that have already
146    /// been processed by this chain.
147    pub next_index: u32,
148    /// The applications that are subscribed to this stream.
149    pub applications: BTreeSet<ApplicationId>,
150}
151
152/// The initial configuration for a new chain.
153#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
154pub struct OpenChainConfig {
155    /// The ownership configuration of the new chain.
156    pub ownership: ChainOwnership,
157    /// The initial chain balance.
158    pub balance: Amount,
159    /// The initial application permissions.
160    pub application_permissions: ApplicationPermissions,
161}
162
163impl OpenChainConfig {
164    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
165    /// parameters.
166    pub fn init_chain_config(
167        &self,
168        epoch: Epoch,
169        min_active_epoch: Epoch,
170        max_active_epoch: Epoch,
171    ) -> InitialChainConfig {
172        InitialChainConfig {
173            application_permissions: self.application_permissions.clone(),
174            balance: self.balance,
175            epoch,
176            min_active_epoch,
177            max_active_epoch,
178            ownership: self.ownership.clone(),
179        }
180    }
181}
182
183/// A system operation.
184#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
185pub enum SystemOperation {
186    /// Transfers `amount` units of value from the given owner's account to the recipient.
187    /// If no owner is given, try to take the units out of the unattributed account.
188    Transfer {
189        owner: AccountOwner,
190        recipient: Account,
191        amount: Amount,
192    },
193    /// Claims `amount` units of value from the given owner's account in the remote
194    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
195    /// process the message.
196    Claim {
197        owner: AccountOwner,
198        target_id: ChainId,
199        recipient: Account,
200        amount: Amount,
201    },
202    /// Creates (or activates) a new chain.
203    /// This will automatically subscribe to the future committees created by `admin_chain_id`.
204    OpenChain(OpenChainConfig),
205    /// Closes the chain.
206    CloseChain,
207    /// Changes the ownership of the chain.
208    ChangeOwnership {
209        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
210        #[debug(skip_if = Vec::is_empty)]
211        super_owners: Vec<AccountOwner>,
212        /// The regular owners, with their weights that determine how often they are round leader.
213        #[debug(skip_if = Vec::is_empty)]
214        owners: Vec<(AccountOwner, u64)>,
215        /// The leader of the first single-leader round. If not set, this is random like other rounds.
216        #[debug(skip_if = Option::is_none)]
217        first_leader: Option<AccountOwner>,
218        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
219        multi_leader_rounds: u32,
220        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
221        /// This should only be `true` on chains with restrictive application permissions and an
222        /// application-based mechanism to select block proposers.
223        open_multi_leader_rounds: bool,
224        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
225        timeout_config: TimeoutConfig,
226    },
227    /// Changes the application permissions configuration on this chain.
228    ChangeApplicationPermissions(ApplicationPermissions),
229    /// Publishes a new application module.
230    PublishModule { module_id: ModuleId },
231    /// Publishes a new data blob.
232    PublishDataBlob { blob_hash: CryptoHash },
233    /// Verifies that the given blob exists. Otherwise the block fails.
234    VerifyBlob { blob_id: BlobId },
235    /// Creates a new application.
236    CreateApplication {
237        module_id: ModuleId,
238        #[serde(with = "serde_bytes")]
239        #[debug(with = "hex_debug")]
240        parameters: Vec<u8>,
241        #[serde(with = "serde_bytes")]
242        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
243        instantiation_argument: Vec<u8>,
244        #[debug(skip_if = Vec::is_empty)]
245        required_application_ids: Vec<ApplicationId>,
246    },
247    /// Operations that are only allowed on the admin chain.
248    Admin(AdminOperation),
249    /// Processes an event about a new epoch and committee.
250    ProcessNewEpoch(Epoch),
251    /// Processes an event about a removed epoch and committee.
252    ProcessRemovedEpoch(Epoch),
253    /// Updates the event stream trackers.
254    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
255}
256
257/// Operations that are only allowed on the admin chain.
258#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
259pub enum AdminOperation {
260    /// Publishes a new committee as a blob. This can be assigned to an epoch using
261    /// [`AdminOperation::CreateCommittee`] in a later block.
262    PublishCommitteeBlob { blob_hash: CryptoHash },
263    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
264    /// [`SystemOperation::ProcessNewEpoch`].
265    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
266    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
267    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
268    /// re-certified) by a block certified by a recent committee.
269    RemoveCommittee { epoch: Epoch },
270}
271
272/// A system message meant to be executed on a remote chain.
273#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
274pub enum SystemMessage {
275    /// Credits `amount` units of value to the account `target` -- unless the message is
276    /// bouncing, in which case `source` is credited instead.
277    Credit {
278        target: AccountOwner,
279        amount: Amount,
280        source: AccountOwner,
281    },
282    /// Withdraws `amount` units of value from the account and starts a transfer to credit
283    /// the recipient. The message must be properly authenticated. Receiver chains may
284    /// refuse it depending on their configuration.
285    Withdraw {
286        owner: AccountOwner,
287        amount: Amount,
288        recipient: Account,
289    },
290}
291
292/// A query to the system state.
293#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
294pub struct SystemQuery;
295
296/// The response to a system query.
297#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
298pub struct SystemResponse {
299    pub chain_id: ChainId,
300    pub balance: Amount,
301}
302
303/// Optional user message attached to a transfer.
304#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
305pub struct UserData(pub Option<[u8; 32]>);
306
307#[derive(Debug)]
308pub struct CreateApplicationResult {
309    pub app_id: ApplicationId,
310}
311
312impl<C> SystemExecutionStateView<C>
313where
314    C: Context + Clone + 'static,
315    C::Extra: ExecutionRuntimeContext,
316{
317    /// Invariant for the states of active chains.
318    pub async fn is_active(&self) -> Result<bool, ViewError> {
319        Ok(self.description.get().await?.is_some()
320            && self.ownership.get().await?.is_active()
321            && self.committees.get().contains_key(self.epoch.get())
322            && self.admin_chain_id.get().is_some())
323    }
324
325    /// Returns the current committee, if any.
326    pub async fn current_committee(
327        &self,
328    ) -> Result<Option<(Epoch, Arc<Committee>)>, ExecutionError> {
329        let epoch = *self.epoch.get();
330        let Some(&hash) = self.committees.get().get(&epoch) else {
331            return Ok(None);
332        };
333        let committee = self
334            .context()
335            .extra()
336            .get_or_load_committee_by_hash(hash)
337            .await?;
338        Ok(Some((epoch, committee)))
339    }
340
341    async fn get_event(&self, event_id: EventId) -> Result<Arc<Vec<u8>>, ExecutionError> {
342        match self.context().extra().get_event(event_id.clone()).await? {
343            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
344            Some(vec) => Ok(vec),
345        }
346    }
347
348    /// Executes the sender's side of an operation and returns a list of actions to be
349    /// taken.
350    pub async fn execute_operation(
351        &mut self,
352        context: OperationContext,
353        operation: SystemOperation,
354        txn_tracker: &mut TransactionTracker,
355        resource_controller: &mut ResourceController<Option<AccountOwner>>,
356    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
357        use SystemOperation::*;
358        let mut new_application = None;
359        match operation {
360            OpenChain(config) => {
361                let _chain_id = self
362                    .open_chain(
363                        config,
364                        context.chain_id,
365                        context.height,
366                        context.timestamp,
367                        txn_tracker,
368                    )
369                    .await?;
370                #[cfg(with_metrics)]
371                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
372            }
373            ChangeOwnership {
374                super_owners,
375                owners,
376                first_leader,
377                multi_leader_rounds,
378                open_multi_leader_rounds,
379                timeout_config,
380            } => {
381                self.ownership.set(ChainOwnership {
382                    super_owners: super_owners.into_iter().collect(),
383                    owners: owners.into_iter().collect(),
384                    first_leader,
385                    multi_leader_rounds,
386                    open_multi_leader_rounds,
387                    timeout_config,
388                });
389            }
390            ChangeApplicationPermissions(application_permissions) => {
391                self.application_permissions.set(application_permissions);
392            }
393            CloseChain => self.close_chain(),
394            Transfer {
395                owner,
396                amount,
397                recipient,
398            } => {
399                let maybe_message = self
400                    .transfer(context.authenticated_owner, None, owner, recipient, amount)
401                    .await?;
402                txn_tracker.add_outgoing_messages(maybe_message);
403            }
404            Claim {
405                owner,
406                target_id,
407                recipient,
408                amount,
409            } => {
410                let maybe_message = self
411                    .claim(
412                        context.authenticated_owner,
413                        None,
414                        owner,
415                        target_id,
416                        recipient,
417                        amount,
418                    )
419                    .await?;
420                txn_tracker.add_outgoing_messages(maybe_message);
421            }
422            Admin(admin_operation) => {
423                ensure!(
424                    *self.admin_chain_id.get() == Some(context.chain_id),
425                    ExecutionError::AdminOperationOnNonAdminChain
426                );
427                match admin_operation {
428                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
429                        self.blob_published(
430                            &BlobId::new(blob_hash, BlobType::Committee),
431                            txn_tracker,
432                        )?;
433                    }
434                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
435                        self.check_next_epoch(epoch)?;
436                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
437                        // Validate that the blob exists and deserializes as a Committee.
438                        self.context()
439                            .extra()
440                            .get_or_load_committee_by_hash(blob_hash)
441                            .await?;
442                        self.blob_used(txn_tracker, blob_id).await?;
443                        self.committees.get_mut().insert(epoch, blob_hash);
444                        self.epoch.set(epoch);
445                        let event_data = EpochEventData {
446                            blob_hash,
447                            timestamp: context.timestamp,
448                        };
449                        txn_tracker.add_event(
450                            StreamId::system(EPOCH_STREAM_NAME),
451                            epoch.0,
452                            bcs::to_bytes(&event_data)?,
453                        );
454                    }
455                    AdminOperation::RemoveCommittee { epoch } => {
456                        ensure!(
457                            self.committees.get_mut().remove(&epoch).is_some(),
458                            ExecutionError::InvalidCommitteeRemoval
459                        );
460                        txn_tracker.add_event(
461                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
462                            epoch.0,
463                            vec![],
464                        );
465                    }
466                }
467            }
468            PublishModule { module_id } => {
469                for blob_id in module_id.bytecode_blob_ids() {
470                    self.blob_published(&blob_id, txn_tracker)?;
471                }
472            }
473            CreateApplication {
474                module_id,
475                parameters,
476                instantiation_argument,
477                required_application_ids,
478            } => {
479                let CreateApplicationResult { app_id } = self
480                    .create_application(
481                        context.chain_id,
482                        context.height,
483                        module_id,
484                        parameters,
485                        required_application_ids,
486                        txn_tracker,
487                    )
488                    .await?;
489                new_application = Some((app_id, instantiation_argument));
490            }
491            PublishDataBlob { blob_hash } => {
492                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
493            }
494            VerifyBlob { blob_id } => {
495                self.assert_blob_exists(blob_id).await?;
496                resource_controller
497                    .with_state(self)
498                    .await?
499                    .track_blob_read(0)?;
500                self.blob_used(txn_tracker, blob_id).await?;
501            }
502            ProcessNewEpoch(epoch) => {
503                self.check_next_epoch(epoch)?;
504                let admin_chain_id = self
505                    .admin_chain_id
506                    .get()
507                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
508                let event_id = EventId {
509                    chain_id: admin_chain_id,
510                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
511                    index: epoch.0,
512                };
513                let bytes = txn_tracker
514                    .oracle(|| async {
515                        let bytes = self.get_event(event_id.clone()).await?;
516                        Ok(OracleResponse::Event(
517                            event_id.clone(),
518                            Arc::unwrap_or_clone(bytes),
519                        ))
520                    })
521                    .await?
522                    .to_event(&event_id)?;
523                let event_data: EpochEventData = bcs::from_bytes(&bytes)?;
524                let blob_id = BlobId::new(event_data.blob_hash, BlobType::Committee);
525                // Validate that the blob exists and deserializes as a Committee.
526                self.context()
527                    .extra()
528                    .get_or_load_committee_by_hash(event_data.blob_hash)
529                    .await?;
530                self.blob_used(txn_tracker, blob_id).await?;
531                self.committees
532                    .get_mut()
533                    .insert(epoch, event_data.blob_hash);
534                self.epoch.set(epoch);
535            }
536            ProcessRemovedEpoch(epoch) => {
537                ensure!(
538                    self.committees.get_mut().remove(&epoch).is_some(),
539                    ExecutionError::InvalidCommitteeRemoval
540                );
541                let admin_chain_id = self
542                    .admin_chain_id
543                    .get()
544                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
545                let event_id = EventId {
546                    chain_id: admin_chain_id,
547                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
548                    index: epoch.0,
549                };
550                txn_tracker
551                    .oracle(|| async {
552                        let bytes = self.get_event(event_id.clone()).await?;
553                        Ok(OracleResponse::Event(event_id, Arc::unwrap_or_clone(bytes)))
554                    })
555                    .await?;
556            }
557            UpdateStreams(streams) => {
558                let mut missing_events = Vec::new();
559                for (chain_id, stream_id, next_index) in streams {
560                    let subscriptions = self
561                        .event_subscriptions
562                        .get_mut_or_default(&(chain_id, stream_id.clone()))
563                        .await?;
564                    ensure!(
565                        subscriptions.next_index < next_index,
566                        ExecutionError::OutdatedUpdateStreams
567                    );
568                    for application_id in &subscriptions.applications {
569                        txn_tracker.add_stream_to_process(
570                            *application_id,
571                            chain_id,
572                            stream_id.clone(),
573                            subscriptions.next_index,
574                            next_index,
575                        );
576                    }
577                    subscriptions.next_index = next_index;
578                    let index = next_index
579                        .checked_sub(1)
580                        .ok_or(ArithmeticError::Underflow)?;
581                    let event_id = EventId {
582                        chain_id,
583                        stream_id,
584                        index,
585                    };
586                    let context = self.context();
587                    let extra = context.extra();
588                    txn_tracker
589                        .oracle(|| async {
590                            if !extra.contains_event(event_id.clone()).await? {
591                                missing_events.push(event_id.clone());
592                            }
593                            Ok(OracleResponse::EventExists(event_id))
594                        })
595                        .await?;
596                }
597                ensure!(
598                    missing_events.is_empty(),
599                    ExecutionError::EventsNotFound(missing_events)
600                );
601            }
602        }
603
604        Ok(new_application)
605    }
606
607    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
608    /// epoch.
609    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
610        let expected = self.epoch.get().try_add_one()?;
611        ensure!(
612            provided == expected,
613            ExecutionError::InvalidCommitteeEpoch { provided, expected }
614        );
615        Ok(())
616    }
617
618    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
619        if owner == &AccountOwner::CHAIN {
620            let new_balance = self.balance.get().saturating_add(amount);
621            self.balance.set(new_balance);
622        } else {
623            let balance = self.balances.get_mut_or_default(owner).await?;
624            *balance = balance.saturating_add(amount);
625        }
626        Ok(())
627    }
628
629    async fn credit_or_send_message(
630        &mut self,
631        source: AccountOwner,
632        recipient: Account,
633        amount: Amount,
634    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
635        let source_chain_id = self.context().extra().chain_id();
636        if recipient.chain_id == source_chain_id {
637            // Handle same-chain transfer locally.
638            let target = recipient.owner;
639            self.credit(&target, amount).await?;
640            Ok(None)
641        } else {
642            // Handle cross-chain transfer with message.
643            let message = SystemMessage::Credit {
644                amount,
645                source,
646                target: recipient.owner,
647            };
648            Ok(Some(
649                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
650            ))
651        }
652    }
653
654    pub async fn transfer(
655        &mut self,
656        authenticated_owner: Option<AccountOwner>,
657        authenticated_application_id: Option<ApplicationId>,
658        source: AccountOwner,
659        recipient: Account,
660        amount: Amount,
661    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
662        if source == AccountOwner::CHAIN {
663            ensure!(
664                authenticated_owner.is_some()
665                    && self
666                        .ownership
667                        .get()
668                        .await?
669                        .is_owner(&authenticated_owner.unwrap()),
670                ExecutionError::UnauthenticatedTransferOwner
671            );
672        } else {
673            ensure!(
674                authenticated_owner == Some(source)
675                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
676                ExecutionError::UnauthenticatedTransferOwner
677            );
678        }
679        ensure!(
680            amount > Amount::ZERO,
681            ExecutionError::IncorrectTransferAmount
682        );
683        self.debit(&source, amount).await?;
684        self.credit_or_send_message(source, recipient, amount).await
685    }
686
687    pub async fn claim(
688        &mut self,
689        authenticated_owner: Option<AccountOwner>,
690        authenticated_application_id: Option<ApplicationId>,
691        source: AccountOwner,
692        target_id: ChainId,
693        recipient: Account,
694        amount: Amount,
695    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
696        ensure!(
697            authenticated_owner == Some(source)
698                || authenticated_application_id.map(AccountOwner::from) == Some(source),
699            ExecutionError::UnauthenticatedClaimOwner
700        );
701        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
702
703        let current_chain_id = self.context().extra().chain_id();
704        if target_id == current_chain_id {
705            // Handle same-chain claim locally by processing the withdraw operation directly
706            self.debit(&source, amount).await?;
707            self.credit_or_send_message(source, recipient, amount).await
708        } else {
709            // Handle cross-chain claim with Withdraw message
710            let message = SystemMessage::Withdraw {
711                amount,
712                owner: source,
713                recipient,
714            };
715            Ok(Some(
716                OutgoingMessage::new(target_id, message)
717                    .with_authenticated_owner(authenticated_owner),
718            ))
719        }
720    }
721
722    pub async fn approve(
723        &mut self,
724        authenticated_owner: Option<AccountOwner>,
725        authenticated_application_id: Option<ApplicationId>,
726        owner: AccountOwner,
727        spender: AccountOwner,
728        amount: Amount,
729    ) -> Result<(), ExecutionError> {
730        ensure!(
731            authenticated_owner == Some(owner)
732                || authenticated_application_id.map(AccountOwner::from) == Some(owner),
733            ExecutionError::UnauthenticatedTransferOwner
734        );
735
736        let owner_spender = OwnerSpender::new(owner, spender);
737        if amount == Amount::ZERO {
738            self.allowances.remove(&owner_spender)?;
739            return Ok(());
740        }
741        let allowance = self.allowances.get_mut_or_default(&owner_spender).await?;
742        *allowance = amount;
743
744        Ok(())
745    }
746
747    pub async fn transfer_from(
748        &mut self,
749        authenticated_owner: Option<AccountOwner>,
750        authenticated_application_id: Option<ApplicationId>,
751        owner: AccountOwner,
752        spender: AccountOwner,
753        recipient: Account,
754        amount: Amount,
755    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
756        ensure!(
757            authenticated_owner == Some(spender)
758                || authenticated_application_id.map(AccountOwner::from) == Some(spender),
759            ExecutionError::UnauthenticatedTransferOwner
760        );
761        ensure!(
762            amount > Amount::ZERO,
763            ExecutionError::IncorrectTransferAmount
764        );
765
766        // Debit from allowance
767        let owner_spender = OwnerSpender::new(owner, spender);
768        let allowance = self.allowances.get_mut_or_default(&owner_spender).await?;
769
770        allowance
771            .try_sub_assign(amount)
772            .map_err(|_| ExecutionError::InsufficientAllowance {
773                allowance: *allowance,
774                owner,
775                spender,
776            })?;
777
778        if allowance.is_zero() {
779            self.allowances.remove(&owner_spender)?;
780        }
781
782        // Debit from owner's balance
783        self.debit(&owner, amount).await?;
784
785        // Credit or send message
786        self.credit_or_send_message(owner, recipient, amount).await
787    }
788
789    /// Debits an [`Amount`] of tokens from an account's balance.
790    async fn debit(
791        &mut self,
792        account: &AccountOwner,
793        amount: Amount,
794    ) -> Result<(), ExecutionError> {
795        let balance = if account == &AccountOwner::CHAIN {
796            self.balance.get_mut()
797        } else {
798            self.balances.get_mut(account).await?.ok_or_else(|| {
799                ExecutionError::InsufficientBalance {
800                    balance: Amount::ZERO,
801                    account: *account,
802                }
803            })?
804        };
805
806        balance
807            .try_sub_assign(amount)
808            .map_err(|_| ExecutionError::InsufficientBalance {
809                balance: *balance,
810                account: *account,
811            })?;
812
813        if account != &AccountOwner::CHAIN && balance.is_zero() {
814            self.balances.remove(account)?;
815        }
816
817        Ok(())
818    }
819
820    /// Executes a cross-chain message that represents the recipient's side of an operation.
821    pub async fn execute_message(
822        &mut self,
823        context: MessageContext,
824        message: SystemMessage,
825    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
826        let mut outcome = Vec::new();
827        use SystemMessage::*;
828        match message {
829            Credit {
830                amount,
831                source,
832                target,
833            } => {
834                let receiver = if context.is_bouncing { source } else { target };
835                self.credit(&receiver, amount).await?;
836            }
837            Withdraw {
838                amount,
839                owner,
840                recipient,
841            } => {
842                self.debit(&owner, amount).await?;
843                if let Some(message) = self
844                    .credit_or_send_message(owner, recipient, amount)
845                    .await?
846                {
847                    outcome.push(message);
848                }
849            }
850        }
851        Ok(outcome)
852    }
853
854    /// Initializes the system application state on a newly opened chain.
855    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
856    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
857        if self.description.get().await?.is_some() {
858            // already initialized
859            return Ok(true);
860        }
861        let description_blob = self
862            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
863            .await?;
864        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
865        let InitialChainConfig {
866            ownership,
867            epoch,
868            balance,
869            min_active_epoch,
870            max_active_epoch,
871            application_permissions,
872        } = description.config().clone();
873        self.timestamp.set(description.timestamp());
874        self.description.set(Some(description));
875        self.epoch.set(epoch);
876
877        let committees = self
878            .context()
879            .extra()
880            .get_committee_hashes(min_active_epoch..=max_active_epoch)
881            .await?;
882        let admin_chain_id = self
883            .context()
884            .extra()
885            .get_network_description()
886            .await?
887            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
888            .admin_chain_id;
889
890        self.committees.set(committees);
891        self.admin_chain_id.set(Some(admin_chain_id));
892        self.ownership.set(ownership);
893        self.balance.set(balance);
894        self.application_permissions.set(application_permissions);
895        Ok(false)
896    }
897
898    pub fn handle_query(
899        &mut self,
900        context: QueryContext,
901        _query: SystemQuery,
902    ) -> QueryOutcome<SystemResponse> {
903        let response = SystemResponse {
904            chain_id: context.chain_id,
905            balance: *self.balance.get(),
906        };
907        QueryOutcome {
908            response,
909            operations: vec![],
910        }
911    }
912
913    /// Returns the messages to open a new chain, and subtracts the new chain's balance
914    /// from this chain's.
915    pub async fn open_chain(
916        &mut self,
917        config: OpenChainConfig,
918        parent: ChainId,
919        block_height: BlockHeight,
920        timestamp: Timestamp,
921        txn_tracker: &mut TransactionTracker,
922    ) -> Result<ChainId, ExecutionError> {
923        let chain_index = txn_tracker.next_chain_index();
924        let chain_origin = ChainOrigin::Child {
925            parent,
926            block_height,
927            chain_index,
928        };
929        let committees = self.committees.get();
930        let init_chain_config = config.init_chain_config(
931            *self.epoch.get(),
932            committees.keys().min().copied().unwrap_or(Epoch::ZERO),
933            committees.keys().max().copied().unwrap_or(Epoch::ZERO),
934        );
935        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
936        let child_id = chain_description.id();
937        self.debit(&AccountOwner::CHAIN, config.balance).await?;
938        let blob = Blob::new_chain_description(&chain_description);
939        txn_tracker.add_created_blob(blob);
940        Ok(child_id)
941    }
942
943    pub fn close_chain(&mut self) {
944        self.closed.set(true);
945    }
946
947    pub async fn create_application(
948        &mut self,
949        chain_id: ChainId,
950        block_height: BlockHeight,
951        module_id: ModuleId,
952        parameters: Vec<u8>,
953        required_application_ids: Vec<ApplicationId>,
954        txn_tracker: &mut TransactionTracker,
955    ) -> Result<CreateApplicationResult, ExecutionError> {
956        let application_index = txn_tracker.next_application_index();
957
958        let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
959        // We only remember to register the blobs that aren't recorded in `used_blobs`
960        // already.
961        for blob_id in blob_ids {
962            self.blob_used(txn_tracker, blob_id).await?;
963        }
964
965        let application_description = ApplicationDescription {
966            module_id,
967            creator_chain_id: chain_id,
968            block_height,
969            application_index,
970            parameters,
971            required_application_ids,
972        };
973        self.check_required_applications(&application_description, txn_tracker)
974            .await?;
975
976        let blob = Blob::new_application_description(&application_description);
977        self.used_blobs.insert(&blob.id())?;
978        txn_tracker.add_created_blob(blob);
979
980        Ok(CreateApplicationResult {
981            app_id: ApplicationId::from(&application_description),
982        })
983    }
984
985    async fn check_required_applications(
986        &mut self,
987        application_description: &ApplicationDescription,
988        txn_tracker: &mut TransactionTracker,
989    ) -> Result<(), ExecutionError> {
990        // Make sure that referenced applications IDs have been registered.
991        for required_id in &application_description.required_application_ids {
992            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
993        }
994        Ok(())
995    }
996
997    /// Retrieves an application's description.
998    pub async fn describe_application(
999        &mut self,
1000        id: ApplicationId,
1001        txn_tracker: &mut TransactionTracker,
1002    ) -> Result<ApplicationDescription, ExecutionError> {
1003        let blob_id = id.description_blob_id();
1004        let content = match txn_tracker.created_blobs().get(&blob_id) {
1005            Some(content) => content.clone(),
1006            None => self.read_blob_content(blob_id).await?,
1007        };
1008        self.blob_used(txn_tracker, blob_id).await?;
1009        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
1010
1011        let blob_ids = self
1012            .check_bytecode_blobs(&description.module_id, txn_tracker)
1013            .await?;
1014        // We only remember to register the blobs that aren't recorded in `used_blobs`
1015        // already.
1016        for blob_id in blob_ids {
1017            self.blob_used(txn_tracker, blob_id).await?;
1018        }
1019
1020        self.check_required_applications(&description, txn_tracker)
1021            .await?;
1022
1023        Ok(description)
1024    }
1025
1026    /// Records a blob that is used in this block. If this is the first use on this chain, creates
1027    /// an oracle response for it.
1028    pub(crate) async fn blob_used(
1029        &mut self,
1030        txn_tracker: &mut TransactionTracker,
1031        blob_id: BlobId,
1032    ) -> Result<bool, ExecutionError> {
1033        if self.used_blobs.contains(&blob_id).await? {
1034            return Ok(false); // Nothing to do.
1035        }
1036        self.used_blobs.insert(&blob_id)?;
1037        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
1038        Ok(true)
1039    }
1040
1041    /// Records a blob that is published in this block. This does not create an oracle entry, and
1042    /// the blob can be used without using an oracle in the future on this chain.
1043    fn blob_published(
1044        &mut self,
1045        blob_id: &BlobId,
1046        txn_tracker: &mut TransactionTracker,
1047    ) -> Result<(), ExecutionError> {
1048        self.used_blobs.insert(blob_id)?;
1049        txn_tracker.add_published_blob(*blob_id);
1050        Ok(())
1051    }
1052
1053    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1054        match self.context().extra().get_blob(blob_id).await {
1055            Ok(Some(blob)) => Ok(Arc::unwrap_or_clone(blob).into()),
1056            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1057            Err(error) => Err(error.into()),
1058        }
1059    }
1060
1061    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1062        if self.context().extra().contains_blob(blob_id).await? {
1063            Ok(())
1064        } else {
1065            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1066        }
1067    }
1068
1069    async fn check_bytecode_blobs(
1070        &self,
1071        module_id: &ModuleId,
1072        txn_tracker: &TransactionTracker,
1073    ) -> Result<Vec<BlobId>, ExecutionError> {
1074        let blob_ids = module_id.bytecode_blob_ids();
1075
1076        let mut missing_blobs = Vec::new();
1077        for blob_id in &blob_ids {
1078            // First check if blob is present in created_blobs
1079            if txn_tracker.created_blobs().contains_key(blob_id) {
1080                continue; // Blob found in created_blobs, it's ok
1081            }
1082            // If not in created_blobs, check storage
1083            if !self.context().extra().contains_blob(*blob_id).await? {
1084                missing_blobs.push(*blob_id);
1085            }
1086        }
1087        ensure!(
1088            missing_blobs.is_empty(),
1089            ExecutionError::BlobsNotFound(missing_blobs)
1090        );
1091
1092        Ok(blob_ids)
1093    }
1094}