1#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10 collections::{BTreeMap, BTreeSet},
11 sync::Arc,
12};
13
14use allocative::Allocative;
15use custom_debug_derive::Debug;
16use linera_base::{
17 crypto::CryptoHash,
18 data_types::{
19 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
20 ChainDescription, ChainOrigin, Cursor, Epoch, InitialChainConfig, OracleResponse,
21 Timestamp,
22 },
23 ensure, hex_debug,
24 identifiers::{
25 Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, OwnerSpender, StreamId,
26 },
27 ownership::{ChainOwnership, TimeoutConfig},
28};
29use linera_views::{
30 context::Context,
31 lazy_register_view::LazyRegisterView,
32 map_view::MapView,
33 register_view::RegisterView,
34 set_view::SetView,
35 views::{ClonableView, ReplaceContext, View},
36 ViewError,
37};
38use serde::{Deserialize, Serialize};
39
40#[cfg(test)]
41use crate::test_utils::SystemExecutionState;
42use crate::{
43 committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
44 ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
45 OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
46};
47
48pub static EPOCH_STREAM_NAME: &[u8] = &[0];
50pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct EpochEventData {
56 pub blob_hash: CryptoHash,
58 pub timestamp: Timestamp,
60}
61
62#[cfg(with_metrics)]
64mod metrics {
65 use std::sync::LazyLock;
66
67 use linera_base::prometheus_util::register_int_counter_vec;
68 use prometheus::IntCounterVec;
69
70 pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
71 register_int_counter_vec(
72 "open_chain_count",
73 "The number of times the `OpenChain` operation was executed",
74 &[],
75 )
76 });
77}
78
79#[derive(Debug, ClonableView, View, Allocative)]
81#[allocative(bound = "C")]
82pub struct SystemExecutionStateView<C> {
83 pub description: LazyRegisterView<C, Option<ChainDescription>>,
85 pub epoch: RegisterView<C, Epoch>,
87 pub admin_chain_id: RegisterView<C, Option<ChainId>>,
89 pub committee_hash: RegisterView<C, Option<CryptoHash>>,
92 pub ownership: LazyRegisterView<C, ChainOwnership>,
94 pub balance: RegisterView<C, Amount>,
96 pub balances: MapView<C, AccountOwner, Amount>,
98 pub allowances: MapView<C, OwnerSpender, Amount>,
100 pub timestamp: RegisterView<C, Timestamp>,
102 pub closed: RegisterView<C, bool>,
104 pub application_permissions: LazyRegisterView<C, ApplicationPermissions>,
106 pub used_blobs: SetView<C, BlobId>,
108 pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
110 pub stream_event_counts: MapView<C, StreamId, u32>,
112 pub finalized_sent_messages: MapView<C, ChainId, (Cursor, CryptoHash)>,
117 pub unfinalized_message_blocks: MapView<C, ChainId, BTreeSet<Cursor>>,
130 pub pending_checkpoint_ack_targets: SetView<C, ChainId>,
136}
137
138impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
139 type Target = SystemExecutionStateView<C2>;
140
141 async fn with_context(
142 &mut self,
143 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
144 ) -> Self::Target {
145 SystemExecutionStateView {
146 description: self.description.with_context(ctx.clone()).await,
147 epoch: self.epoch.with_context(ctx.clone()).await,
148 admin_chain_id: self.admin_chain_id.with_context(ctx.clone()).await,
149 committee_hash: self.committee_hash.with_context(ctx.clone()).await,
150 ownership: self.ownership.with_context(ctx.clone()).await,
151 balance: self.balance.with_context(ctx.clone()).await,
152 balances: self.balances.with_context(ctx.clone()).await,
153 allowances: self.allowances.with_context(ctx.clone()).await,
154 timestamp: self.timestamp.with_context(ctx.clone()).await,
155 closed: self.closed.with_context(ctx.clone()).await,
156 application_permissions: self.application_permissions.with_context(ctx.clone()).await,
157 used_blobs: self.used_blobs.with_context(ctx.clone()).await,
158 event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
159 stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
160 finalized_sent_messages: self.finalized_sent_messages.with_context(ctx.clone()).await,
161 unfinalized_message_blocks: self
162 .unfinalized_message_blocks
163 .with_context(ctx.clone())
164 .await,
165 pending_checkpoint_ack_targets: self
166 .pending_checkpoint_ack_targets
167 .with_context(ctx.clone())
168 .await,
169 }
170 }
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, Allocative)]
175pub struct EventSubscriptions {
176 pub min_next_index: u32,
180 pub applications: BTreeMap<ApplicationId, u32>,
183}
184
185impl Default for EventSubscriptions {
186 fn default() -> Self {
187 Self {
188 min_next_index: u32::MAX,
189 applications: BTreeMap::new(),
190 }
191 }
192}
193
194impl EventSubscriptions {
195 pub(crate) fn recalculate_min(&mut self) {
196 self.min_next_index = self
197 .applications
198 .values()
199 .copied()
200 .min()
201 .unwrap_or(u32::MAX);
202 }
203}
204
205#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
207pub struct OpenChainConfig {
208 pub ownership: ChainOwnership,
210 pub balance: Amount,
212 pub application_permissions: ApplicationPermissions,
214}
215
216impl OpenChainConfig {
217 pub fn init_chain_config(&self, epoch: Epoch) -> InitialChainConfig {
220 InitialChainConfig {
221 application_permissions: self.application_permissions.clone(),
222 balance: self.balance,
223 epoch,
224 ownership: self.ownership.clone(),
225 }
226 }
227}
228
229#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
231pub enum SystemOperation {
232 Transfer {
235 owner: AccountOwner,
236 recipient: Account,
237 amount: Amount,
238 },
239 Claim {
243 owner: AccountOwner,
244 target_id: ChainId,
245 recipient: Account,
246 amount: Amount,
247 },
248 OpenChain(OpenChainConfig),
251 CloseChain,
253 ChangeOwnership {
255 #[debug(skip_if = Vec::is_empty)]
257 super_owners: Vec<AccountOwner>,
258 #[debug(skip_if = Vec::is_empty)]
260 owners: Vec<(AccountOwner, u64)>,
261 #[debug(skip_if = Option::is_none)]
263 first_leader: Option<AccountOwner>,
264 multi_leader_rounds: u32,
266 open_multi_leader_rounds: bool,
270 timeout_config: TimeoutConfig,
272 },
273 ChangeApplicationPermissions(ApplicationPermissions),
275 PublishModule { module_id: ModuleId },
277 PublishDataBlob { blob_hash: CryptoHash },
279 VerifyBlob { blob_id: BlobId },
281 CreateApplication {
283 module_id: ModuleId,
284 #[serde(with = "serde_bytes")]
285 #[debug(with = "hex_debug")]
286 parameters: Vec<u8>,
287 #[serde(with = "serde_bytes")]
288 #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
289 instantiation_argument: Vec<u8>,
290 #[debug(skip_if = Vec::is_empty)]
291 required_application_ids: Vec<ApplicationId>,
292 },
293 Admin(AdminOperation),
295 ProcessNewEpoch(Epoch),
297 UpdateStream {
299 application_id: ApplicationId,
300 chain_id: ChainId,
301 stream_id: StreamId,
302 next_index: u32,
303 },
304 Checkpoint,
309}
310
311#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
313pub enum AdminOperation {
314 PublishCommitteeBlob { blob_hash: CryptoHash },
317 CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
320 RemoveCommittee { epoch: Epoch },
323}
324
325#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
327pub enum SystemMessage {
328 Credit {
331 target: AccountOwner,
332 amount: Amount,
333 source: AccountOwner,
334 },
335 Withdraw {
339 owner: AccountOwner,
340 amount: Amount,
341 recipient: Account,
342 },
343 CheckpointAck { latest_received_cursor: Cursor },
350}
351
352#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
354pub struct SystemQuery;
355
356#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
358pub struct SystemResponse {
359 pub chain_id: ChainId,
360 pub balance: Amount,
361}
362
363#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
365pub struct UserData(pub Option<[u8; 32]>);
366
367#[derive(Debug)]
368pub struct CreateApplicationResult {
369 pub app_id: ApplicationId,
370}
371
372impl<C> SystemExecutionStateView<C>
373where
374 C: Context + Clone + 'static,
375 C::Extra: ExecutionRuntimeContext,
376{
377 pub async fn is_active(&self) -> Result<bool, ViewError> {
379 Ok(self.description.get().await?.is_some()
380 && self.ownership.get().await?.is_active()
381 && self.admin_chain_id.get().is_some())
382 }
383
384 pub async fn current_committee(
386 &self,
387 ) -> Result<Option<(Epoch, Arc<Committee>)>, ExecutionError> {
388 let Some(hash) = *self.committee_hash.get() else {
389 return Ok(None);
390 };
391 let epoch = *self.epoch.get();
392 let committee = self
393 .context()
394 .extra()
395 .get_or_load_committee_by_hash(hash)
396 .await?;
397 Ok(Some((epoch, committee)))
398 }
399
400 async fn get_event(&self, event_id: EventId) -> Result<Arc<Vec<u8>>, ExecutionError> {
401 match self.context().extra().get_event(event_id.clone()).await? {
402 None => Err(ExecutionError::EventsNotFound(vec![event_id])),
403 Some(vec) => Ok(vec),
404 }
405 }
406
407 pub async fn execute_operation(
410 &mut self,
411 context: OperationContext,
412 operation: SystemOperation,
413 txn_tracker: &mut TransactionTracker,
414 resource_controller: &mut ResourceController<Option<AccountOwner>>,
415 ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
416 use SystemOperation::*;
417 let mut new_application = None;
418 match operation {
419 OpenChain(config) => {
420 let _chain_id = self
421 .open_chain(
422 config,
423 context.chain_id,
424 context.height,
425 context.timestamp,
426 txn_tracker,
427 )
428 .await?;
429 #[cfg(with_metrics)]
430 metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
431 }
432 ChangeOwnership {
433 super_owners,
434 owners,
435 first_leader,
436 multi_leader_rounds,
437 open_multi_leader_rounds,
438 timeout_config,
439 } => {
440 self.ownership.set(ChainOwnership {
441 super_owners: super_owners.into_iter().collect(),
442 owners: owners.into_iter().collect(),
443 first_leader,
444 multi_leader_rounds,
445 open_multi_leader_rounds,
446 timeout_config,
447 });
448 }
449 ChangeApplicationPermissions(application_permissions) => {
450 self.application_permissions.set(application_permissions);
451 }
452 CloseChain => self.close_chain(),
453 Transfer {
454 owner,
455 amount,
456 recipient,
457 } => {
458 let maybe_message = self
459 .transfer(context.authenticated_owner, None, owner, recipient, amount)
460 .await?;
461 txn_tracker.add_outgoing_messages(maybe_message);
462 }
463 Claim {
464 owner,
465 target_id,
466 recipient,
467 amount,
468 } => {
469 let maybe_message = self
470 .claim(
471 context.authenticated_owner,
472 None,
473 owner,
474 target_id,
475 recipient,
476 amount,
477 )
478 .await?;
479 txn_tracker.add_outgoing_messages(maybe_message);
480 }
481 Admin(admin_operation) => {
482 ensure!(
483 *self.admin_chain_id.get() == Some(context.chain_id),
484 ExecutionError::AdminOperationOnNonAdminChain
485 );
486 match admin_operation {
487 AdminOperation::PublishCommitteeBlob { blob_hash } => {
488 self.blob_published(
489 &BlobId::new(blob_hash, BlobType::Committee),
490 txn_tracker,
491 )?;
492 }
493 AdminOperation::CreateCommittee { epoch, blob_hash } => {
494 self.check_next_epoch(epoch)?;
495 let blob_id = BlobId::new(blob_hash, BlobType::Committee);
496 self.context()
498 .extra()
499 .get_or_load_committee_by_hash(blob_hash)
500 .await?;
501 self.blob_used(txn_tracker, blob_id).await?;
502 self.committee_hash.set(Some(blob_hash));
503 self.epoch.set(epoch);
504 let event_data = EpochEventData {
505 blob_hash,
506 timestamp: context.timestamp,
507 };
508 let stream_id = StreamId::system(EPOCH_STREAM_NAME);
509 let next_index = epoch.0.checked_add(1).ok_or(ArithmeticError::Overflow)?;
510 self.stream_event_counts.insert(&stream_id, next_index)?;
511 txn_tracker.add_event(stream_id, epoch.0, bcs::to_bytes(&event_data)?);
512 }
513 AdminOperation::RemoveCommittee { epoch } => {
514 let stream_id = StreamId::system(REMOVED_EPOCH_STREAM_NAME);
515 let count = self.stream_event_counts.get(&stream_id).await?.unwrap_or(0);
516 ensure!(
519 count == epoch.0 && epoch < *self.epoch.get(),
520 ExecutionError::InvalidCommitteeRemoval
521 );
522 let next_index = epoch.0.checked_add(1).ok_or(ArithmeticError::Overflow)?;
523 self.stream_event_counts.insert(&stream_id, next_index)?;
524 txn_tracker.add_event(stream_id, epoch.0, vec![]);
525 }
526 }
527 }
528 PublishModule { module_id } => {
529 for blob_id in module_id.bytecode_blob_ids() {
530 self.blob_published(&blob_id, txn_tracker)?;
531 }
532 }
533 CreateApplication {
534 module_id,
535 parameters,
536 instantiation_argument,
537 required_application_ids,
538 } => {
539 let CreateApplicationResult { app_id } = self
540 .create_application(
541 context.chain_id,
542 context.height,
543 module_id,
544 parameters,
545 required_application_ids,
546 txn_tracker,
547 )
548 .await?;
549 new_application = Some((app_id, instantiation_argument));
550 }
551 PublishDataBlob { blob_hash } => {
552 self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
553 }
554 VerifyBlob { blob_id } => {
555 self.assert_blob_exists(blob_id).await?;
556 resource_controller
557 .with_state(self)
558 .await?
559 .track_blob_read(0)?;
560 self.blob_used(txn_tracker, blob_id).await?;
561 }
562 ProcessNewEpoch(epoch) => {
563 self.check_next_epoch(epoch)?;
564 let admin_chain_id = self.admin_chain_id.get().ok_or_else(|| {
565 ExecutionError::InternalError(
566 "execute_operation called for uninitialized chain",
567 )
568 })?;
569 let event_id = EventId {
570 chain_id: admin_chain_id,
571 stream_id: StreamId::system(EPOCH_STREAM_NAME),
572 index: epoch.0,
573 };
574 let bytes = txn_tracker
575 .oracle(|| async {
576 let bytes = self.get_event(event_id.clone()).await?;
577 Ok(OracleResponse::Event(
578 event_id.clone(),
579 Arc::unwrap_or_clone(bytes),
580 ))
581 })
582 .await?
583 .to_event(&event_id)?;
584 let event_data: EpochEventData = bcs::from_bytes(&bytes)?;
585 let blob_id = BlobId::new(event_data.blob_hash, BlobType::Committee);
586 self.context()
588 .extra()
589 .get_or_load_committee_by_hash(event_data.blob_hash)
590 .await?;
591 self.blob_used(txn_tracker, blob_id).await?;
592 self.committee_hash.set(Some(event_data.blob_hash));
593 self.epoch.set(epoch);
594 }
595 UpdateStream {
596 application_id,
597 chain_id,
598 stream_id,
599 next_index,
600 } => {
601 let subscriptions = self
602 .event_subscriptions
603 .get_mut_or_default(&(chain_id, stream_id.clone()))
604 .await?;
605 let app_next_index = *subscriptions
606 .applications
607 .get(&application_id)
608 .ok_or(ExecutionError::UnsubscribedUpdateStream)?;
609 ensure!(
610 app_next_index < next_index,
611 ExecutionError::OutdatedUpdateStream
612 );
613 txn_tracker.add_stream_to_process(
614 application_id,
615 chain_id,
616 stream_id.clone(),
617 app_next_index,
618 next_index,
619 );
620 subscriptions
621 .applications
622 .insert(application_id, next_index);
623 subscriptions.recalculate_min();
624 let index = next_index
625 .checked_sub(1)
626 .ok_or(ArithmeticError::Underflow)?;
627 let event_id = EventId {
628 chain_id,
629 stream_id,
630 index,
631 };
632 let context = self.context();
633 let extra = context.extra();
634 let mut missing_events = Vec::new();
635 txn_tracker
636 .oracle(|| async {
637 if !extra.contains_event(event_id.clone()).await? {
638 missing_events.push(event_id.clone());
639 }
640 Ok(OracleResponse::EventExists(event_id))
641 })
642 .await?;
643 ensure!(
644 missing_events.is_empty(),
645 ExecutionError::EventsNotFound(missing_events)
646 );
647 }
648 Checkpoint => {
649 return Err(ExecutionError::InternalError(
650 "SystemOperation::Checkpoint must be dispatched at ExecutionStateView level",
651 ));
652 }
653 }
654
655 Ok(new_application)
656 }
657
658 fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
661 let expected = self.epoch.get().try_add_one()?;
662 ensure!(
663 provided == expected,
664 ExecutionError::InvalidCommitteeEpoch { provided, expected }
665 );
666 Ok(())
667 }
668
669 async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
670 if owner == &AccountOwner::CHAIN {
671 let new_balance = self.balance.get().saturating_add(amount);
672 self.balance.set(new_balance);
673 } else {
674 let balance = self.balances.get_mut_or_default(owner).await?;
675 *balance = balance.saturating_add(amount);
676 }
677 Ok(())
678 }
679
680 async fn credit_or_send_message(
681 &mut self,
682 source: AccountOwner,
683 recipient: Account,
684 amount: Amount,
685 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
686 let source_chain_id = self.context().extra().chain_id();
687 if recipient.chain_id == source_chain_id {
688 let target = recipient.owner;
690 self.credit(&target, amount).await?;
691 Ok(None)
692 } else {
693 let message = SystemMessage::Credit {
695 amount,
696 source,
697 target: recipient.owner,
698 };
699 Ok(Some(
700 OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
701 ))
702 }
703 }
704
705 pub async fn transfer(
706 &mut self,
707 authenticated_owner: Option<AccountOwner>,
708 authenticated_application_id: Option<ApplicationId>,
709 source: AccountOwner,
710 recipient: Account,
711 amount: Amount,
712 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
713 if source == AccountOwner::CHAIN {
714 let authenticated_owner =
715 authenticated_owner.ok_or(ExecutionError::UnauthenticatedTransferOwner)?;
716 ensure!(
717 self.ownership.get().await?.is_owner(&authenticated_owner),
718 ExecutionError::UnauthenticatedTransferOwner
719 );
720 } else {
721 ensure!(
722 authenticated_owner == Some(source)
723 || authenticated_application_id.map(AccountOwner::from) == Some(source),
724 ExecutionError::UnauthenticatedTransferOwner
725 );
726 }
727 ensure!(
728 amount > Amount::ZERO,
729 ExecutionError::IncorrectTransferAmount
730 );
731 self.debit(&source, amount).await?;
732 self.credit_or_send_message(source, recipient, amount).await
733 }
734
735 pub async fn claim(
736 &mut self,
737 authenticated_owner: Option<AccountOwner>,
738 authenticated_application_id: Option<ApplicationId>,
739 source: AccountOwner,
740 target_id: ChainId,
741 recipient: Account,
742 amount: Amount,
743 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
744 ensure!(
745 authenticated_owner == Some(source)
746 || authenticated_application_id.map(AccountOwner::from) == Some(source),
747 ExecutionError::UnauthenticatedClaimOwner
748 );
749 ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
750
751 let current_chain_id = self.context().extra().chain_id();
752 if target_id == current_chain_id {
753 self.debit(&source, amount).await?;
755 self.credit_or_send_message(source, recipient, amount).await
756 } else {
757 let message = SystemMessage::Withdraw {
759 amount,
760 owner: source,
761 recipient,
762 };
763 Ok(Some(
764 OutgoingMessage::new(target_id, message)
765 .with_authenticated_owner(authenticated_owner),
766 ))
767 }
768 }
769
770 pub async fn approve(
771 &mut self,
772 authenticated_owner: Option<AccountOwner>,
773 authenticated_application_id: Option<ApplicationId>,
774 owner: AccountOwner,
775 spender: AccountOwner,
776 amount: Amount,
777 ) -> Result<(), ExecutionError> {
778 ensure!(
779 authenticated_owner == Some(owner)
780 || authenticated_application_id.map(AccountOwner::from) == Some(owner),
781 ExecutionError::UnauthenticatedTransferOwner
782 );
783
784 let owner_spender = OwnerSpender::new(owner, spender);
785 if amount == Amount::ZERO {
786 self.allowances.remove(&owner_spender)?;
787 return Ok(());
788 }
789 let allowance = self.allowances.get_mut_or_default(&owner_spender).await?;
790 *allowance = amount;
791
792 Ok(())
793 }
794
795 pub async fn transfer_from(
796 &mut self,
797 authenticated_owner: Option<AccountOwner>,
798 authenticated_application_id: Option<ApplicationId>,
799 owner: AccountOwner,
800 spender: AccountOwner,
801 recipient: Account,
802 amount: Amount,
803 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
804 ensure!(
805 authenticated_owner == Some(spender)
806 || authenticated_application_id.map(AccountOwner::from) == Some(spender),
807 ExecutionError::UnauthenticatedTransferOwner
808 );
809 ensure!(
810 amount > Amount::ZERO,
811 ExecutionError::IncorrectTransferAmount
812 );
813
814 let owner_spender = OwnerSpender::new(owner, spender);
816 let allowance = self.allowances.get_mut_or_default(&owner_spender).await?;
817
818 allowance
819 .try_sub_assign(amount)
820 .map_err(|_| ExecutionError::InsufficientAllowance {
821 allowance: *allowance,
822 owner,
823 spender,
824 })?;
825
826 if allowance.is_zero() {
827 self.allowances.remove(&owner_spender)?;
828 }
829
830 self.debit(&owner, amount).await?;
832
833 self.credit_or_send_message(owner, recipient, amount).await
835 }
836
837 async fn debit(
839 &mut self,
840 account: &AccountOwner,
841 amount: Amount,
842 ) -> Result<(), ExecutionError> {
843 let balance = if account == &AccountOwner::CHAIN {
844 self.balance.get_mut()
845 } else {
846 self.balances.get_mut(account).await?.ok_or_else(|| {
847 ExecutionError::InsufficientBalance {
848 balance: Amount::ZERO,
849 account: *account,
850 }
851 })?
852 };
853
854 balance
855 .try_sub_assign(amount)
856 .map_err(|_| ExecutionError::InsufficientBalance {
857 balance: *balance,
858 account: *account,
859 })?;
860
861 if account != &AccountOwner::CHAIN && balance.is_zero() {
862 self.balances.remove(account)?;
863 }
864
865 Ok(())
866 }
867
868 pub async fn execute_message(
870 &mut self,
871 context: MessageContext,
872 message: SystemMessage,
873 ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
874 let mut outcome = Vec::new();
875 use SystemMessage::*;
876 match message {
877 Credit {
878 amount,
879 source,
880 target,
881 } => {
882 let receiver = if context.is_bouncing { source } else { target };
883 self.credit(&receiver, amount).await?;
884 }
885 Withdraw {
886 amount,
887 owner,
888 recipient,
889 } => {
890 self.debit(&owner, amount).await?;
891 if let Some(message) = self
892 .credit_or_send_message(owner, recipient, amount)
893 .await?
894 {
895 outcome.push(message);
896 }
897 }
898 CheckpointAck {
899 latest_received_cursor,
900 } => {
901 self.finalized_sent_messages.insert(
902 &context.origin,
903 (latest_received_cursor, context.origin_certificate_hash),
904 )?;
905 if let Some(mut cursors) =
911 self.unfinalized_message_blocks.get(&context.origin).await?
912 {
913 let retained = cursors.split_off(&latest_received_cursor);
914 if retained.is_empty() {
915 self.unfinalized_message_blocks.remove(&context.origin)?;
916 } else {
917 self.unfinalized_message_blocks
918 .insert(&context.origin, retained)?;
919 }
920 }
921 }
922 }
923 Ok(outcome)
924 }
925
926 pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
929 if self.description.get().await?.is_some() {
930 return Ok(true);
932 }
933 let description_blob = self
934 .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
935 .await?;
936 let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
937 let InitialChainConfig {
938 ownership,
939 epoch,
940 balance,
941 application_permissions,
942 } = description.config().clone();
943 self.timestamp.set(description.timestamp());
944 self.description.set(Some(description));
945 self.epoch.set(epoch);
946
947 let committee_hash = *self
948 .context()
949 .extra()
950 .get_committee_hashes(epoch..=epoch)
951 .await?
952 .get(&epoch)
953 .expect("get_committee_hashes returns the requested epoch on success");
954 let admin_chain_id = self
955 .context()
956 .extra()
957 .get_network_description()
958 .await?
959 .ok_or(ExecutionError::NoNetworkDescriptionFound)?
960 .admin_chain_id;
961
962 self.committee_hash.set(Some(committee_hash));
963 self.admin_chain_id.set(Some(admin_chain_id));
964 self.ownership.set(ownership);
965 self.balance.set(balance);
966 self.application_permissions.set(application_permissions);
967 Ok(false)
968 }
969
970 pub fn handle_query(
971 &mut self,
972 context: QueryContext,
973 _query: SystemQuery,
974 ) -> QueryOutcome<SystemResponse> {
975 let response = SystemResponse {
976 chain_id: context.chain_id,
977 balance: *self.balance.get(),
978 };
979 QueryOutcome {
980 response,
981 operations: vec![],
982 }
983 }
984
985 pub async fn open_chain(
988 &mut self,
989 config: OpenChainConfig,
990 parent: ChainId,
991 block_height: BlockHeight,
992 timestamp: Timestamp,
993 txn_tracker: &mut TransactionTracker,
994 ) -> Result<ChainId, ExecutionError> {
995 let chain_index = txn_tracker.next_chain_index();
996 let chain_origin = ChainOrigin::Child {
997 parent,
998 block_height,
999 chain_index,
1000 };
1001 let init_chain_config = config.init_chain_config(*self.epoch.get());
1002 let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
1003 let child_id = chain_description.id();
1004 self.debit(&AccountOwner::CHAIN, config.balance).await?;
1005 let blob = Blob::new_chain_description(&chain_description);
1006 txn_tracker.add_created_blob(blob);
1007 Ok(child_id)
1008 }
1009
1010 pub fn close_chain(&mut self) {
1011 self.closed.set(true);
1012 }
1013
1014 pub async fn create_application(
1015 &mut self,
1016 chain_id: ChainId,
1017 block_height: BlockHeight,
1018 module_id: ModuleId,
1019 parameters: Vec<u8>,
1020 required_application_ids: Vec<ApplicationId>,
1021 txn_tracker: &mut TransactionTracker,
1022 ) -> Result<CreateApplicationResult, ExecutionError> {
1023 let application_index = txn_tracker.next_application_index();
1024
1025 let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
1026 for blob_id in blob_ids {
1029 self.blob_used(txn_tracker, blob_id).await?;
1030 }
1031
1032 let application_description = ApplicationDescription {
1033 module_id,
1034 creator_chain_id: chain_id,
1035 block_height,
1036 application_index,
1037 parameters,
1038 required_application_ids,
1039 };
1040 self.check_required_applications(&application_description, txn_tracker)
1041 .await?;
1042
1043 let blob = Blob::new_application_description(&application_description);
1044 self.used_blobs.insert(&blob.id())?;
1045 txn_tracker.add_created_blob(blob);
1046
1047 Ok(CreateApplicationResult {
1048 app_id: ApplicationId::from(&application_description),
1049 })
1050 }
1051
1052 async fn check_required_applications(
1053 &mut self,
1054 application_description: &ApplicationDescription,
1055 txn_tracker: &mut TransactionTracker,
1056 ) -> Result<(), ExecutionError> {
1057 for required_id in &application_description.required_application_ids {
1059 Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
1060 }
1061 Ok(())
1062 }
1063
1064 pub async fn describe_application(
1066 &mut self,
1067 id: ApplicationId,
1068 txn_tracker: &mut TransactionTracker,
1069 ) -> Result<ApplicationDescription, ExecutionError> {
1070 let blob_id = id.description_blob_id();
1071 let content = match txn_tracker.created_blobs().get(&blob_id) {
1072 Some(content) => content.clone(),
1073 None => self.read_blob_content(blob_id).await?,
1074 };
1075 self.blob_used(txn_tracker, blob_id).await?;
1076 let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
1077
1078 let blob_ids = self
1079 .check_bytecode_blobs(&description.module_id, txn_tracker)
1080 .await?;
1081 for blob_id in blob_ids {
1084 self.blob_used(txn_tracker, blob_id).await?;
1085 }
1086
1087 self.check_required_applications(&description, txn_tracker)
1088 .await?;
1089
1090 Ok(description)
1091 }
1092
1093 pub(crate) async fn blob_used(
1096 &mut self,
1097 txn_tracker: &mut TransactionTracker,
1098 blob_id: BlobId,
1099 ) -> Result<bool, ExecutionError> {
1100 if self.used_blobs.contains(&blob_id).await? {
1101 return Ok(false); }
1103 self.used_blobs.insert(&blob_id)?;
1104 txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
1105 Ok(true)
1106 }
1107
1108 fn blob_published(
1111 &mut self,
1112 blob_id: &BlobId,
1113 txn_tracker: &mut TransactionTracker,
1114 ) -> Result<(), ExecutionError> {
1115 self.used_blobs.insert(blob_id)?;
1116 txn_tracker.add_published_blob(*blob_id);
1117 Ok(())
1118 }
1119
1120 pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1121 match self.context().extra().get_blob(blob_id).await {
1122 Ok(Some(blob)) => Ok(Arc::unwrap_or_clone(blob).into()),
1123 Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1124 Err(error) => Err(error.into()),
1125 }
1126 }
1127
1128 pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1129 if self.context().extra().contains_blob(blob_id).await? {
1130 Ok(())
1131 } else {
1132 Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1133 }
1134 }
1135
1136 async fn check_bytecode_blobs(
1137 &self,
1138 module_id: &ModuleId,
1139 txn_tracker: &TransactionTracker,
1140 ) -> Result<Vec<BlobId>, ExecutionError> {
1141 let blob_ids = module_id.bytecode_blob_ids();
1142
1143 let mut missing_blobs = Vec::new();
1144 for blob_id in &blob_ids {
1145 if txn_tracker.created_blobs().contains_key(blob_id) {
1147 continue; }
1149 if !self.context().extra().contains_blob(*blob_id).await? {
1151 missing_blobs.push(*blob_id);
1152 }
1153 }
1154 ensure!(
1155 missing_blobs.is_empty(),
1156 ExecutionError::BlobsNotFound(missing_blobs)
1157 );
1158
1159 Ok(blob_ids)
1160 }
1161}