1#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10 collections::{BTreeMap, BTreeSet},
11 sync::Arc,
12};
13
14use allocative::Allocative;
15use custom_debug_derive::Debug;
16use linera_base::{
17 crypto::CryptoHash,
18 data_types::{
19 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
20 ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
21 },
22 ensure, hex_debug,
23 identifiers::{
24 Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, OwnerSpender, StreamId,
25 },
26 ownership::{ChainOwnership, TimeoutConfig},
27};
28use linera_views::{
29 context::Context,
30 lazy_register_view::LazyRegisterView,
31 map_view::MapView,
32 register_view::RegisterView,
33 set_view::SetView,
34 views::{ClonableView, ReplaceContext, View},
35 ViewError,
36};
37use serde::{Deserialize, Serialize};
38
39#[cfg(test)]
40use crate::test_utils::SystemExecutionState;
41use crate::{
42 committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
43 ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
44 OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
45};
46
47pub static EPOCH_STREAM_NAME: &[u8] = &[0];
49pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct EpochEventData {
55 pub blob_hash: CryptoHash,
57 pub timestamp: Timestamp,
59}
60
61#[cfg(with_metrics)]
63mod metrics {
64 use std::sync::LazyLock;
65
66 use linera_base::prometheus_util::register_int_counter_vec;
67 use prometheus::IntCounterVec;
68
69 pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
70 register_int_counter_vec(
71 "open_chain_count",
72 "The number of times the `OpenChain` operation was executed",
73 &[],
74 )
75 });
76}
77
78#[derive(Debug, ClonableView, View, Allocative)]
80#[allocative(bound = "C")]
81pub struct SystemExecutionStateView<C> {
82 pub description: LazyRegisterView<C, Option<ChainDescription>>,
84 pub epoch: RegisterView<C, Epoch>,
86 pub admin_chain_id: RegisterView<C, Option<ChainId>>,
88 pub committees: RegisterView<C, BTreeMap<Epoch, CryptoHash>>,
94 pub ownership: LazyRegisterView<C, ChainOwnership>,
96 pub balance: RegisterView<C, Amount>,
98 pub balances: MapView<C, AccountOwner, Amount>,
100 pub allowances: MapView<C, OwnerSpender, Amount>,
102 pub timestamp: RegisterView<C, Timestamp>,
104 pub closed: RegisterView<C, bool>,
106 pub application_permissions: LazyRegisterView<C, ApplicationPermissions>,
108 pub used_blobs: SetView<C, BlobId>,
110 pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
112 pub stream_event_counts: MapView<C, StreamId, u32>,
114}
115
116impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
117 type Target = SystemExecutionStateView<C2>;
118
119 async fn with_context(
120 &mut self,
121 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
122 ) -> Self::Target {
123 SystemExecutionStateView {
124 description: self.description.with_context(ctx.clone()).await,
125 epoch: self.epoch.with_context(ctx.clone()).await,
126 admin_chain_id: self.admin_chain_id.with_context(ctx.clone()).await,
127 committees: self.committees.with_context(ctx.clone()).await,
128 ownership: self.ownership.with_context(ctx.clone()).await,
129 balance: self.balance.with_context(ctx.clone()).await,
130 balances: self.balances.with_context(ctx.clone()).await,
131 allowances: self.allowances.with_context(ctx.clone()).await,
132 timestamp: self.timestamp.with_context(ctx.clone()).await,
133 closed: self.closed.with_context(ctx.clone()).await,
134 application_permissions: self.application_permissions.with_context(ctx.clone()).await,
135 used_blobs: self.used_blobs.with_context(ctx.clone()).await,
136 event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
137 stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
138 }
139 }
140}
141
142#[derive(Debug, Default, Clone, Serialize, Deserialize, Allocative)]
144pub struct EventSubscriptions {
145 pub next_index: u32,
148 pub applications: BTreeSet<ApplicationId>,
150}
151
152#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
154pub struct OpenChainConfig {
155 pub ownership: ChainOwnership,
157 pub balance: Amount,
159 pub application_permissions: ApplicationPermissions,
161}
162
163impl OpenChainConfig {
164 pub fn init_chain_config(
167 &self,
168 epoch: Epoch,
169 min_active_epoch: Epoch,
170 max_active_epoch: Epoch,
171 ) -> InitialChainConfig {
172 InitialChainConfig {
173 application_permissions: self.application_permissions.clone(),
174 balance: self.balance,
175 epoch,
176 min_active_epoch,
177 max_active_epoch,
178 ownership: self.ownership.clone(),
179 }
180 }
181}
182
183#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
185pub enum SystemOperation {
186 Transfer {
189 owner: AccountOwner,
190 recipient: Account,
191 amount: Amount,
192 },
193 Claim {
197 owner: AccountOwner,
198 target_id: ChainId,
199 recipient: Account,
200 amount: Amount,
201 },
202 OpenChain(OpenChainConfig),
205 CloseChain,
207 ChangeOwnership {
209 #[debug(skip_if = Vec::is_empty)]
211 super_owners: Vec<AccountOwner>,
212 #[debug(skip_if = Vec::is_empty)]
214 owners: Vec<(AccountOwner, u64)>,
215 #[debug(skip_if = Option::is_none)]
217 first_leader: Option<AccountOwner>,
218 multi_leader_rounds: u32,
220 open_multi_leader_rounds: bool,
224 timeout_config: TimeoutConfig,
226 },
227 ChangeApplicationPermissions(ApplicationPermissions),
229 PublishModule { module_id: ModuleId },
231 PublishDataBlob { blob_hash: CryptoHash },
233 VerifyBlob { blob_id: BlobId },
235 CreateApplication {
237 module_id: ModuleId,
238 #[serde(with = "serde_bytes")]
239 #[debug(with = "hex_debug")]
240 parameters: Vec<u8>,
241 #[serde(with = "serde_bytes")]
242 #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
243 instantiation_argument: Vec<u8>,
244 #[debug(skip_if = Vec::is_empty)]
245 required_application_ids: Vec<ApplicationId>,
246 },
247 Admin(AdminOperation),
249 ProcessNewEpoch(Epoch),
251 ProcessRemovedEpoch(Epoch),
253 UpdateStreams(Vec<(ChainId, StreamId, u32)>),
255}
256
257#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
259pub enum AdminOperation {
260 PublishCommitteeBlob { blob_hash: CryptoHash },
263 CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
266 RemoveCommittee { epoch: Epoch },
270}
271
272#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
274pub enum SystemMessage {
275 Credit {
278 target: AccountOwner,
279 amount: Amount,
280 source: AccountOwner,
281 },
282 Withdraw {
286 owner: AccountOwner,
287 amount: Amount,
288 recipient: Account,
289 },
290}
291
292#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
294pub struct SystemQuery;
295
296#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
298pub struct SystemResponse {
299 pub chain_id: ChainId,
300 pub balance: Amount,
301}
302
303#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
305pub struct UserData(pub Option<[u8; 32]>);
306
307#[derive(Debug)]
308pub struct CreateApplicationResult {
309 pub app_id: ApplicationId,
310}
311
312impl<C> SystemExecutionStateView<C>
313where
314 C: Context + Clone + 'static,
315 C::Extra: ExecutionRuntimeContext,
316{
317 pub async fn is_active(&self) -> Result<bool, ViewError> {
319 Ok(self.description.get().await?.is_some()
320 && self.ownership.get().await?.is_active()
321 && self.committees.get().contains_key(self.epoch.get())
322 && self.admin_chain_id.get().is_some())
323 }
324
325 pub async fn current_committee(
327 &self,
328 ) -> Result<Option<(Epoch, Arc<Committee>)>, ExecutionError> {
329 let epoch = *self.epoch.get();
330 let Some(&hash) = self.committees.get().get(&epoch) else {
331 return Ok(None);
332 };
333 let committee = self
334 .context()
335 .extra()
336 .get_or_load_committee_by_hash(hash)
337 .await?;
338 Ok(Some((epoch, committee)))
339 }
340
341 async fn get_event(&self, event_id: EventId) -> Result<Arc<Vec<u8>>, ExecutionError> {
342 match self.context().extra().get_event(event_id.clone()).await? {
343 None => Err(ExecutionError::EventsNotFound(vec![event_id])),
344 Some(vec) => Ok(vec),
345 }
346 }
347
348 pub async fn execute_operation(
351 &mut self,
352 context: OperationContext,
353 operation: SystemOperation,
354 txn_tracker: &mut TransactionTracker,
355 resource_controller: &mut ResourceController<Option<AccountOwner>>,
356 ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
357 use SystemOperation::*;
358 let mut new_application = None;
359 match operation {
360 OpenChain(config) => {
361 let _chain_id = self
362 .open_chain(
363 config,
364 context.chain_id,
365 context.height,
366 context.timestamp,
367 txn_tracker,
368 )
369 .await?;
370 #[cfg(with_metrics)]
371 metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
372 }
373 ChangeOwnership {
374 super_owners,
375 owners,
376 first_leader,
377 multi_leader_rounds,
378 open_multi_leader_rounds,
379 timeout_config,
380 } => {
381 self.ownership.set(ChainOwnership {
382 super_owners: super_owners.into_iter().collect(),
383 owners: owners.into_iter().collect(),
384 first_leader,
385 multi_leader_rounds,
386 open_multi_leader_rounds,
387 timeout_config,
388 });
389 }
390 ChangeApplicationPermissions(application_permissions) => {
391 self.application_permissions.set(application_permissions);
392 }
393 CloseChain => self.close_chain(),
394 Transfer {
395 owner,
396 amount,
397 recipient,
398 } => {
399 let maybe_message = self
400 .transfer(context.authenticated_owner, None, owner, recipient, amount)
401 .await?;
402 txn_tracker.add_outgoing_messages(maybe_message);
403 }
404 Claim {
405 owner,
406 target_id,
407 recipient,
408 amount,
409 } => {
410 let maybe_message = self
411 .claim(
412 context.authenticated_owner,
413 None,
414 owner,
415 target_id,
416 recipient,
417 amount,
418 )
419 .await?;
420 txn_tracker.add_outgoing_messages(maybe_message);
421 }
422 Admin(admin_operation) => {
423 ensure!(
424 *self.admin_chain_id.get() == Some(context.chain_id),
425 ExecutionError::AdminOperationOnNonAdminChain
426 );
427 match admin_operation {
428 AdminOperation::PublishCommitteeBlob { blob_hash } => {
429 self.blob_published(
430 &BlobId::new(blob_hash, BlobType::Committee),
431 txn_tracker,
432 )?;
433 }
434 AdminOperation::CreateCommittee { epoch, blob_hash } => {
435 self.check_next_epoch(epoch)?;
436 let blob_id = BlobId::new(blob_hash, BlobType::Committee);
437 self.context()
439 .extra()
440 .get_or_load_committee_by_hash(blob_hash)
441 .await?;
442 self.blob_used(txn_tracker, blob_id).await?;
443 self.committees.get_mut().insert(epoch, blob_hash);
444 self.epoch.set(epoch);
445 let event_data = EpochEventData {
446 blob_hash,
447 timestamp: context.timestamp,
448 };
449 txn_tracker.add_event(
450 StreamId::system(EPOCH_STREAM_NAME),
451 epoch.0,
452 bcs::to_bytes(&event_data)?,
453 );
454 }
455 AdminOperation::RemoveCommittee { epoch } => {
456 ensure!(
457 self.committees.get_mut().remove(&epoch).is_some(),
458 ExecutionError::InvalidCommitteeRemoval
459 );
460 txn_tracker.add_event(
461 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
462 epoch.0,
463 vec![],
464 );
465 }
466 }
467 }
468 PublishModule { module_id } => {
469 for blob_id in module_id.bytecode_blob_ids() {
470 self.blob_published(&blob_id, txn_tracker)?;
471 }
472 }
473 CreateApplication {
474 module_id,
475 parameters,
476 instantiation_argument,
477 required_application_ids,
478 } => {
479 let CreateApplicationResult { app_id } = self
480 .create_application(
481 context.chain_id,
482 context.height,
483 module_id,
484 parameters,
485 required_application_ids,
486 txn_tracker,
487 )
488 .await?;
489 new_application = Some((app_id, instantiation_argument));
490 }
491 PublishDataBlob { blob_hash } => {
492 self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
493 }
494 VerifyBlob { blob_id } => {
495 self.assert_blob_exists(blob_id).await?;
496 resource_controller
497 .with_state(self)
498 .await?
499 .track_blob_read(0)?;
500 self.blob_used(txn_tracker, blob_id).await?;
501 }
502 ProcessNewEpoch(epoch) => {
503 self.check_next_epoch(epoch)?;
504 let admin_chain_id = self
505 .admin_chain_id
506 .get()
507 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
508 let event_id = EventId {
509 chain_id: admin_chain_id,
510 stream_id: StreamId::system(EPOCH_STREAM_NAME),
511 index: epoch.0,
512 };
513 let bytes = txn_tracker
514 .oracle(|| async {
515 let bytes = self.get_event(event_id.clone()).await?;
516 Ok(OracleResponse::Event(
517 event_id.clone(),
518 Arc::unwrap_or_clone(bytes),
519 ))
520 })
521 .await?
522 .to_event(&event_id)?;
523 let event_data: EpochEventData = bcs::from_bytes(&bytes)?;
524 let blob_id = BlobId::new(event_data.blob_hash, BlobType::Committee);
525 self.context()
527 .extra()
528 .get_or_load_committee_by_hash(event_data.blob_hash)
529 .await?;
530 self.blob_used(txn_tracker, blob_id).await?;
531 self.committees
532 .get_mut()
533 .insert(epoch, event_data.blob_hash);
534 self.epoch.set(epoch);
535 }
536 ProcessRemovedEpoch(epoch) => {
537 ensure!(
538 self.committees.get_mut().remove(&epoch).is_some(),
539 ExecutionError::InvalidCommitteeRemoval
540 );
541 let admin_chain_id = self
542 .admin_chain_id
543 .get()
544 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
545 let event_id = EventId {
546 chain_id: admin_chain_id,
547 stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
548 index: epoch.0,
549 };
550 txn_tracker
551 .oracle(|| async {
552 let bytes = self.get_event(event_id.clone()).await?;
553 Ok(OracleResponse::Event(event_id, Arc::unwrap_or_clone(bytes)))
554 })
555 .await?;
556 }
557 UpdateStreams(streams) => {
558 let mut missing_events = Vec::new();
559 for (chain_id, stream_id, next_index) in streams {
560 let subscriptions = self
561 .event_subscriptions
562 .get_mut_or_default(&(chain_id, stream_id.clone()))
563 .await?;
564 ensure!(
565 subscriptions.next_index < next_index,
566 ExecutionError::OutdatedUpdateStreams
567 );
568 for application_id in &subscriptions.applications {
569 txn_tracker.add_stream_to_process(
570 *application_id,
571 chain_id,
572 stream_id.clone(),
573 subscriptions.next_index,
574 next_index,
575 );
576 }
577 subscriptions.next_index = next_index;
578 let index = next_index
579 .checked_sub(1)
580 .ok_or(ArithmeticError::Underflow)?;
581 let event_id = EventId {
582 chain_id,
583 stream_id,
584 index,
585 };
586 let context = self.context();
587 let extra = context.extra();
588 txn_tracker
589 .oracle(|| async {
590 if !extra.contains_event(event_id.clone()).await? {
591 missing_events.push(event_id.clone());
592 }
593 Ok(OracleResponse::EventExists(event_id))
594 })
595 .await?;
596 }
597 ensure!(
598 missing_events.is_empty(),
599 ExecutionError::EventsNotFound(missing_events)
600 );
601 }
602 }
603
604 Ok(new_application)
605 }
606
607 fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
610 let expected = self.epoch.get().try_add_one()?;
611 ensure!(
612 provided == expected,
613 ExecutionError::InvalidCommitteeEpoch { provided, expected }
614 );
615 Ok(())
616 }
617
618 async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
619 if owner == &AccountOwner::CHAIN {
620 let new_balance = self.balance.get().saturating_add(amount);
621 self.balance.set(new_balance);
622 } else {
623 let balance = self.balances.get_mut_or_default(owner).await?;
624 *balance = balance.saturating_add(amount);
625 }
626 Ok(())
627 }
628
629 async fn credit_or_send_message(
630 &mut self,
631 source: AccountOwner,
632 recipient: Account,
633 amount: Amount,
634 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
635 let source_chain_id = self.context().extra().chain_id();
636 if recipient.chain_id == source_chain_id {
637 let target = recipient.owner;
639 self.credit(&target, amount).await?;
640 Ok(None)
641 } else {
642 let message = SystemMessage::Credit {
644 amount,
645 source,
646 target: recipient.owner,
647 };
648 Ok(Some(
649 OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
650 ))
651 }
652 }
653
654 pub async fn transfer(
655 &mut self,
656 authenticated_owner: Option<AccountOwner>,
657 authenticated_application_id: Option<ApplicationId>,
658 source: AccountOwner,
659 recipient: Account,
660 amount: Amount,
661 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
662 if source == AccountOwner::CHAIN {
663 ensure!(
664 authenticated_owner.is_some()
665 && self
666 .ownership
667 .get()
668 .await?
669 .is_owner(&authenticated_owner.unwrap()),
670 ExecutionError::UnauthenticatedTransferOwner
671 );
672 } else {
673 ensure!(
674 authenticated_owner == Some(source)
675 || authenticated_application_id.map(AccountOwner::from) == Some(source),
676 ExecutionError::UnauthenticatedTransferOwner
677 );
678 }
679 ensure!(
680 amount > Amount::ZERO,
681 ExecutionError::IncorrectTransferAmount
682 );
683 self.debit(&source, amount).await?;
684 self.credit_or_send_message(source, recipient, amount).await
685 }
686
687 pub async fn claim(
688 &mut self,
689 authenticated_owner: Option<AccountOwner>,
690 authenticated_application_id: Option<ApplicationId>,
691 source: AccountOwner,
692 target_id: ChainId,
693 recipient: Account,
694 amount: Amount,
695 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
696 ensure!(
697 authenticated_owner == Some(source)
698 || authenticated_application_id.map(AccountOwner::from) == Some(source),
699 ExecutionError::UnauthenticatedClaimOwner
700 );
701 ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
702
703 let current_chain_id = self.context().extra().chain_id();
704 if target_id == current_chain_id {
705 self.debit(&source, amount).await?;
707 self.credit_or_send_message(source, recipient, amount).await
708 } else {
709 let message = SystemMessage::Withdraw {
711 amount,
712 owner: source,
713 recipient,
714 };
715 Ok(Some(
716 OutgoingMessage::new(target_id, message)
717 .with_authenticated_owner(authenticated_owner),
718 ))
719 }
720 }
721
722 pub async fn approve(
723 &mut self,
724 authenticated_owner: Option<AccountOwner>,
725 authenticated_application_id: Option<ApplicationId>,
726 owner: AccountOwner,
727 spender: AccountOwner,
728 amount: Amount,
729 ) -> Result<(), ExecutionError> {
730 ensure!(
731 authenticated_owner == Some(owner)
732 || authenticated_application_id.map(AccountOwner::from) == Some(owner),
733 ExecutionError::UnauthenticatedTransferOwner
734 );
735
736 let owner_spender = OwnerSpender::new(owner, spender);
737 if amount == Amount::ZERO {
738 self.allowances.remove(&owner_spender)?;
739 return Ok(());
740 }
741 let allowance = self.allowances.get_mut_or_default(&owner_spender).await?;
742 *allowance = amount;
743
744 Ok(())
745 }
746
747 pub async fn transfer_from(
748 &mut self,
749 authenticated_owner: Option<AccountOwner>,
750 authenticated_application_id: Option<ApplicationId>,
751 owner: AccountOwner,
752 spender: AccountOwner,
753 recipient: Account,
754 amount: Amount,
755 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
756 ensure!(
757 authenticated_owner == Some(spender)
758 || authenticated_application_id.map(AccountOwner::from) == Some(spender),
759 ExecutionError::UnauthenticatedTransferOwner
760 );
761 ensure!(
762 amount > Amount::ZERO,
763 ExecutionError::IncorrectTransferAmount
764 );
765
766 let owner_spender = OwnerSpender::new(owner, spender);
768 let allowance = self.allowances.get_mut_or_default(&owner_spender).await?;
769
770 allowance
771 .try_sub_assign(amount)
772 .map_err(|_| ExecutionError::InsufficientAllowance {
773 allowance: *allowance,
774 owner,
775 spender,
776 })?;
777
778 if allowance.is_zero() {
779 self.allowances.remove(&owner_spender)?;
780 }
781
782 self.debit(&owner, amount).await?;
784
785 self.credit_or_send_message(owner, recipient, amount).await
787 }
788
789 async fn debit(
791 &mut self,
792 account: &AccountOwner,
793 amount: Amount,
794 ) -> Result<(), ExecutionError> {
795 let balance = if account == &AccountOwner::CHAIN {
796 self.balance.get_mut()
797 } else {
798 self.balances.get_mut(account).await?.ok_or_else(|| {
799 ExecutionError::InsufficientBalance {
800 balance: Amount::ZERO,
801 account: *account,
802 }
803 })?
804 };
805
806 balance
807 .try_sub_assign(amount)
808 .map_err(|_| ExecutionError::InsufficientBalance {
809 balance: *balance,
810 account: *account,
811 })?;
812
813 if account != &AccountOwner::CHAIN && balance.is_zero() {
814 self.balances.remove(account)?;
815 }
816
817 Ok(())
818 }
819
820 pub async fn execute_message(
822 &mut self,
823 context: MessageContext,
824 message: SystemMessage,
825 ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
826 let mut outcome = Vec::new();
827 use SystemMessage::*;
828 match message {
829 Credit {
830 amount,
831 source,
832 target,
833 } => {
834 let receiver = if context.is_bouncing { source } else { target };
835 self.credit(&receiver, amount).await?;
836 }
837 Withdraw {
838 amount,
839 owner,
840 recipient,
841 } => {
842 self.debit(&owner, amount).await?;
843 if let Some(message) = self
844 .credit_or_send_message(owner, recipient, amount)
845 .await?
846 {
847 outcome.push(message);
848 }
849 }
850 }
851 Ok(outcome)
852 }
853
854 pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
857 if self.description.get().await?.is_some() {
858 return Ok(true);
860 }
861 let description_blob = self
862 .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
863 .await?;
864 let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
865 let InitialChainConfig {
866 ownership,
867 epoch,
868 balance,
869 min_active_epoch,
870 max_active_epoch,
871 application_permissions,
872 } = description.config().clone();
873 self.timestamp.set(description.timestamp());
874 self.description.set(Some(description));
875 self.epoch.set(epoch);
876
877 let committees = self
878 .context()
879 .extra()
880 .get_committee_hashes(min_active_epoch..=max_active_epoch)
881 .await?;
882 let admin_chain_id = self
883 .context()
884 .extra()
885 .get_network_description()
886 .await?
887 .ok_or(ExecutionError::NoNetworkDescriptionFound)?
888 .admin_chain_id;
889
890 self.committees.set(committees);
891 self.admin_chain_id.set(Some(admin_chain_id));
892 self.ownership.set(ownership);
893 self.balance.set(balance);
894 self.application_permissions.set(application_permissions);
895 Ok(false)
896 }
897
898 pub fn handle_query(
899 &mut self,
900 context: QueryContext,
901 _query: SystemQuery,
902 ) -> QueryOutcome<SystemResponse> {
903 let response = SystemResponse {
904 chain_id: context.chain_id,
905 balance: *self.balance.get(),
906 };
907 QueryOutcome {
908 response,
909 operations: vec![],
910 }
911 }
912
913 pub async fn open_chain(
916 &mut self,
917 config: OpenChainConfig,
918 parent: ChainId,
919 block_height: BlockHeight,
920 timestamp: Timestamp,
921 txn_tracker: &mut TransactionTracker,
922 ) -> Result<ChainId, ExecutionError> {
923 let chain_index = txn_tracker.next_chain_index();
924 let chain_origin = ChainOrigin::Child {
925 parent,
926 block_height,
927 chain_index,
928 };
929 let committees = self.committees.get();
930 let init_chain_config = config.init_chain_config(
931 *self.epoch.get(),
932 committees.keys().min().copied().unwrap_or(Epoch::ZERO),
933 committees.keys().max().copied().unwrap_or(Epoch::ZERO),
934 );
935 let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
936 let child_id = chain_description.id();
937 self.debit(&AccountOwner::CHAIN, config.balance).await?;
938 let blob = Blob::new_chain_description(&chain_description);
939 txn_tracker.add_created_blob(blob);
940 Ok(child_id)
941 }
942
943 pub fn close_chain(&mut self) {
944 self.closed.set(true);
945 }
946
947 pub async fn create_application(
948 &mut self,
949 chain_id: ChainId,
950 block_height: BlockHeight,
951 module_id: ModuleId,
952 parameters: Vec<u8>,
953 required_application_ids: Vec<ApplicationId>,
954 txn_tracker: &mut TransactionTracker,
955 ) -> Result<CreateApplicationResult, ExecutionError> {
956 let application_index = txn_tracker.next_application_index();
957
958 let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
959 for blob_id in blob_ids {
962 self.blob_used(txn_tracker, blob_id).await?;
963 }
964
965 let application_description = ApplicationDescription {
966 module_id,
967 creator_chain_id: chain_id,
968 block_height,
969 application_index,
970 parameters,
971 required_application_ids,
972 };
973 self.check_required_applications(&application_description, txn_tracker)
974 .await?;
975
976 let blob = Blob::new_application_description(&application_description);
977 self.used_blobs.insert(&blob.id())?;
978 txn_tracker.add_created_blob(blob);
979
980 Ok(CreateApplicationResult {
981 app_id: ApplicationId::from(&application_description),
982 })
983 }
984
985 async fn check_required_applications(
986 &mut self,
987 application_description: &ApplicationDescription,
988 txn_tracker: &mut TransactionTracker,
989 ) -> Result<(), ExecutionError> {
990 for required_id in &application_description.required_application_ids {
992 Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
993 }
994 Ok(())
995 }
996
997 pub async fn describe_application(
999 &mut self,
1000 id: ApplicationId,
1001 txn_tracker: &mut TransactionTracker,
1002 ) -> Result<ApplicationDescription, ExecutionError> {
1003 let blob_id = id.description_blob_id();
1004 let content = match txn_tracker.created_blobs().get(&blob_id) {
1005 Some(content) => content.clone(),
1006 None => self.read_blob_content(blob_id).await?,
1007 };
1008 self.blob_used(txn_tracker, blob_id).await?;
1009 let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
1010
1011 let blob_ids = self
1012 .check_bytecode_blobs(&description.module_id, txn_tracker)
1013 .await?;
1014 for blob_id in blob_ids {
1017 self.blob_used(txn_tracker, blob_id).await?;
1018 }
1019
1020 self.check_required_applications(&description, txn_tracker)
1021 .await?;
1022
1023 Ok(description)
1024 }
1025
1026 pub(crate) async fn blob_used(
1029 &mut self,
1030 txn_tracker: &mut TransactionTracker,
1031 blob_id: BlobId,
1032 ) -> Result<bool, ExecutionError> {
1033 if self.used_blobs.contains(&blob_id).await? {
1034 return Ok(false); }
1036 self.used_blobs.insert(&blob_id)?;
1037 txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
1038 Ok(true)
1039 }
1040
1041 fn blob_published(
1044 &mut self,
1045 blob_id: &BlobId,
1046 txn_tracker: &mut TransactionTracker,
1047 ) -> Result<(), ExecutionError> {
1048 self.used_blobs.insert(blob_id)?;
1049 txn_tracker.add_published_blob(*blob_id);
1050 Ok(())
1051 }
1052
1053 pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1054 match self.context().extra().get_blob(blob_id).await {
1055 Ok(Some(blob)) => Ok(Arc::unwrap_or_clone(blob).into()),
1056 Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1057 Err(error) => Err(error.into()),
1058 }
1059 }
1060
1061 pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1062 if self.context().extra().contains_blob(blob_id).await? {
1063 Ok(())
1064 } else {
1065 Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1066 }
1067 }
1068
1069 async fn check_bytecode_blobs(
1070 &self,
1071 module_id: &ModuleId,
1072 txn_tracker: &TransactionTracker,
1073 ) -> Result<Vec<BlobId>, ExecutionError> {
1074 let blob_ids = module_id.bytecode_blob_ids();
1075
1076 let mut missing_blobs = Vec::new();
1077 for blob_id in &blob_ids {
1078 if txn_tracker.created_blobs().contains_key(blob_id) {
1080 continue; }
1082 if !self.context().extra().contains_blob(*blob_id).await? {
1084 missing_blobs.push(*blob_id);
1085 }
1086 }
1087 ensure!(
1088 missing_blobs.is_empty(),
1089 ExecutionError::BlobsNotFound(missing_blobs)
1090 );
1091
1092 Ok(blob_ids)
1093 }
1094}