1#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10 collections::{BTreeMap, BTreeSet, HashSet},
11 mem,
12};
13
14use custom_debug_derive::Debug;
15use linera_base::{
16 crypto::CryptoHash,
17 data_types::{
18 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
19 ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
20 },
21 ensure, hex_debug,
22 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
23 ownership::{ChainOwnership, TimeoutConfig},
24};
25use linera_views::{
26 context::Context,
27 map_view::{HashedMapView, MapView},
28 register_view::HashedRegisterView,
29 set_view::HashedSetView,
30 views::{ClonableView, HashableView, View},
31};
32use serde::{Deserialize, Serialize};
33
34#[cfg(test)]
35use crate::test_utils::SystemExecutionState;
36use crate::{
37 committee::Committee, ApplicationDescription, ApplicationId, ExecutionError,
38 ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext, OutgoingMessage,
39 QueryContext, QueryOutcome, ResourceController, TransactionTracker,
40};
41
42pub static EPOCH_STREAM_NAME: &[u8] = &[0];
44pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
46
47#[cfg(with_metrics)]
49mod metrics {
50 use std::sync::LazyLock;
51
52 use linera_base::prometheus_util::register_int_counter_vec;
53 use prometheus::IntCounterVec;
54
55 pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
56 register_int_counter_vec(
57 "open_chain_count",
58 "The number of times the `OpenChain` operation was executed",
59 &[],
60 )
61 });
62}
63
64#[derive(Debug, ClonableView, HashableView)]
66pub struct SystemExecutionStateView<C> {
67 pub description: HashedRegisterView<C, Option<ChainDescription>>,
69 pub epoch: HashedRegisterView<C, Epoch>,
71 pub admin_id: HashedRegisterView<C, Option<ChainId>>,
73 pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
78 pub ownership: HashedRegisterView<C, ChainOwnership>,
80 pub balance: HashedRegisterView<C, Amount>,
82 pub balances: HashedMapView<C, AccountOwner, Amount>,
84 pub timestamp: HashedRegisterView<C, Timestamp>,
86 pub closed: HashedRegisterView<C, bool>,
88 pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
90 pub used_blobs: HashedSetView<C, BlobId>,
92 pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
94}
95
96#[derive(Debug, Default, Clone, Serialize, Deserialize)]
98pub struct EventSubscriptions {
99 pub next_index: u32,
102 pub applications: BTreeSet<ApplicationId>,
104}
105
106#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
108pub struct OpenChainConfig {
109 pub ownership: ChainOwnership,
111 pub balance: Amount,
113 pub application_permissions: ApplicationPermissions,
115}
116
117impl OpenChainConfig {
118 pub fn init_chain_config(
121 &self,
122 epoch: Epoch,
123 min_active_epoch: Epoch,
124 max_active_epoch: Epoch,
125 ) -> InitialChainConfig {
126 InitialChainConfig {
127 application_permissions: self.application_permissions.clone(),
128 balance: self.balance,
129 epoch,
130 min_active_epoch,
131 max_active_epoch,
132 ownership: self.ownership.clone(),
133 }
134 }
135}
136
137#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
139pub enum SystemOperation {
140 Transfer {
143 owner: AccountOwner,
144 recipient: Recipient,
145 amount: Amount,
146 },
147 Claim {
151 owner: AccountOwner,
152 target_id: ChainId,
153 recipient: Recipient,
154 amount: Amount,
155 },
156 OpenChain(OpenChainConfig),
159 CloseChain,
161 ChangeOwnership {
163 #[debug(skip_if = Vec::is_empty)]
165 super_owners: Vec<AccountOwner>,
166 #[debug(skip_if = Vec::is_empty)]
168 owners: Vec<(AccountOwner, u64)>,
169 multi_leader_rounds: u32,
171 open_multi_leader_rounds: bool,
175 timeout_config: TimeoutConfig,
177 },
178 ChangeApplicationPermissions(ApplicationPermissions),
180 PublishModule { module_id: ModuleId },
182 PublishDataBlob { blob_hash: CryptoHash },
184 VerifyBlob { blob_id: BlobId },
186 CreateApplication {
188 module_id: ModuleId,
189 #[serde(with = "serde_bytes")]
190 #[debug(with = "hex_debug")]
191 parameters: Vec<u8>,
192 #[serde(with = "serde_bytes")]
193 #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
194 instantiation_argument: Vec<u8>,
195 #[debug(skip_if = Vec::is_empty)]
196 required_application_ids: Vec<ApplicationId>,
197 },
198 Admin(AdminOperation),
200 ProcessNewEpoch(Epoch),
202 ProcessRemovedEpoch(Epoch),
204 UpdateStreams(Vec<(ChainId, StreamId, u32)>),
206}
207
208#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
210pub enum AdminOperation {
211 PublishCommitteeBlob { blob_hash: CryptoHash },
214 CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
217 RemoveCommittee { epoch: Epoch },
221}
222
223#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
225pub enum SystemMessage {
226 Credit {
229 target: AccountOwner,
230 amount: Amount,
231 source: AccountOwner,
232 },
233 Withdraw {
237 owner: AccountOwner,
238 amount: Amount,
239 recipient: Recipient,
240 },
241 ApplicationCreated,
243}
244
245#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
247pub struct SystemQuery;
248
249#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
251pub struct SystemResponse {
252 pub chain_id: ChainId,
253 pub balance: Amount,
254}
255
256#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
258pub enum Recipient {
259 Burn,
261 Account(Account),
263}
264
265impl Recipient {
266 pub fn chain(chain_id: ChainId) -> Recipient {
268 Recipient::Account(Account::chain(chain_id))
269 }
270}
271
272impl From<ChainId> for Recipient {
273 fn from(chain_id: ChainId) -> Self {
274 Recipient::chain(chain_id)
275 }
276}
277
278impl From<Account> for Recipient {
279 fn from(account: Account) -> Self {
280 Recipient::Account(account)
281 }
282}
283
284#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
286pub struct UserData(pub Option<[u8; 32]>);
287
288impl UserData {
289 pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
290 let option_array = match opt_str {
292 Some(s) => {
293 let vec = s.into_bytes();
295 if vec.len() <= 32 {
296 let mut array = [b' '; 32];
298
299 let len = vec.len().min(32);
301 array[..len].copy_from_slice(&vec[..len]);
302
303 Some(array)
304 } else {
305 return Err(vec.len());
306 }
307 }
308 None => None,
309 };
310
311 Ok(UserData(option_array))
313 }
314}
315
316#[derive(Debug)]
317pub struct CreateApplicationResult {
318 pub app_id: ApplicationId,
319 pub txn_tracker: TransactionTracker,
320}
321
322impl<C> SystemExecutionStateView<C>
323where
324 C: Context + Clone + Send + Sync + 'static,
325 C::Extra: ExecutionRuntimeContext,
326{
327 pub fn is_active(&self) -> bool {
329 self.description.get().is_some()
330 && self.ownership.get().is_active()
331 && self.current_committee().is_some()
332 && self.admin_id.get().is_some()
333 }
334
335 pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
337 let epoch = self.epoch.get();
338 let committee = self.committees.get().get(epoch)?;
339 Some((*epoch, committee))
340 }
341
342 async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
343 match self.context().extra().get_event(event_id.clone()).await? {
344 None => Err(ExecutionError::EventsNotFound(vec![event_id])),
345 Some(vec) => Ok(vec),
346 }
347 }
348
349 pub async fn execute_operation(
352 &mut self,
353 context: OperationContext,
354 operation: SystemOperation,
355 txn_tracker: &mut TransactionTracker,
356 resource_controller: &mut ResourceController<Option<AccountOwner>>,
357 ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
358 use SystemOperation::*;
359 let mut new_application = None;
360 match operation {
361 OpenChain(config) => {
362 let _chain_id = self
363 .open_chain(
364 config,
365 context.chain_id,
366 context.height,
367 context.timestamp,
368 txn_tracker,
369 )
370 .await?;
371 #[cfg(with_metrics)]
372 metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
373 }
374 ChangeOwnership {
375 super_owners,
376 owners,
377 multi_leader_rounds,
378 open_multi_leader_rounds,
379 timeout_config,
380 } => {
381 self.ownership.set(ChainOwnership {
382 super_owners: super_owners.into_iter().collect(),
383 owners: owners.into_iter().collect(),
384 multi_leader_rounds,
385 open_multi_leader_rounds,
386 timeout_config,
387 });
388 }
389 ChangeApplicationPermissions(application_permissions) => {
390 self.application_permissions.set(application_permissions);
391 }
392 CloseChain => self.close_chain().await?,
393 Transfer {
394 owner,
395 amount,
396 recipient,
397 } => {
398 let maybe_message = self
399 .transfer(context.authenticated_signer, None, owner, recipient, amount)
400 .await?;
401 txn_tracker.add_outgoing_messages(maybe_message)?;
402 }
403 Claim {
404 owner,
405 target_id,
406 recipient,
407 amount,
408 } => {
409 let message = self
410 .claim(
411 context.authenticated_signer,
412 None,
413 owner,
414 target_id,
415 recipient,
416 amount,
417 )
418 .await?;
419 txn_tracker.add_outgoing_message(message)?;
420 }
421 Admin(admin_operation) => {
422 ensure!(
423 *self.admin_id.get() == Some(context.chain_id),
424 ExecutionError::AdminOperationOnNonAdminChain
425 );
426 match admin_operation {
427 AdminOperation::PublishCommitteeBlob { blob_hash } => {
428 self.blob_published(
429 &BlobId::new(blob_hash, BlobType::Committee),
430 txn_tracker,
431 )?;
432 }
433 AdminOperation::CreateCommittee { epoch, blob_hash } => {
434 self.check_next_epoch(epoch)?;
435 let blob_id = BlobId::new(blob_hash, BlobType::Committee);
436 let committee =
437 bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
438 self.blob_used(txn_tracker, blob_id).await?;
439 self.committees.get_mut().insert(epoch, committee);
440 self.epoch.set(epoch);
441 txn_tracker.add_event(
442 StreamId::system(EPOCH_STREAM_NAME),
443 epoch.0,
444 bcs::to_bytes(&blob_hash)?,
445 );
446 }
447 AdminOperation::RemoveCommittee { epoch } => {
448 ensure!(
449 self.committees.get_mut().remove(&epoch).is_some(),
450 ExecutionError::InvalidCommitteeRemoval
451 );
452 txn_tracker.add_event(
453 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
454 epoch.0,
455 vec![],
456 );
457 }
458 }
459 }
460 PublishModule { module_id } => {
461 for blob_id in module_id.bytecode_blob_ids() {
462 self.blob_published(&blob_id, txn_tracker)?;
463 }
464 }
465 CreateApplication {
466 module_id,
467 parameters,
468 instantiation_argument,
469 required_application_ids,
470 } => {
471 let txn_tracker_moved = mem::take(txn_tracker);
472 let CreateApplicationResult {
473 app_id,
474 txn_tracker: txn_tracker_moved,
475 } = self
476 .create_application(
477 context.chain_id,
478 context.height,
479 module_id,
480 parameters,
481 required_application_ids,
482 txn_tracker_moved,
483 )
484 .await?;
485 *txn_tracker = txn_tracker_moved;
486 new_application = Some((app_id, instantiation_argument));
487 }
488 PublishDataBlob { blob_hash } => {
489 self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
490 }
491 VerifyBlob { blob_id } => {
492 self.assert_blob_exists(blob_id).await?;
493 resource_controller
494 .with_state(self)
495 .await?
496 .track_blob_read(0)?;
497 self.blob_used(txn_tracker, blob_id).await?;
498 }
499 ProcessNewEpoch(epoch) => {
500 self.check_next_epoch(epoch)?;
501 let admin_id = self
502 .admin_id
503 .get()
504 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
505 let event_id = EventId {
506 chain_id: admin_id,
507 stream_id: StreamId::system(EPOCH_STREAM_NAME),
508 index: epoch.0,
509 };
510 let bytes = match txn_tracker.next_replayed_oracle_response()? {
511 None => self.get_event(event_id.clone()).await?,
512 Some(OracleResponse::Event(recorded_event_id, bytes))
513 if recorded_event_id == event_id =>
514 {
515 bytes
516 }
517 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
518 };
519 let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
520 txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
521 let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
522 self.blob_used(txn_tracker, blob_id).await?;
523 self.committees.get_mut().insert(epoch, committee);
524 self.epoch.set(epoch);
525 }
526 ProcessRemovedEpoch(epoch) => {
527 ensure!(
528 self.committees.get_mut().remove(&epoch).is_some(),
529 ExecutionError::InvalidCommitteeRemoval
530 );
531 let admin_id = self
532 .admin_id
533 .get()
534 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
535 let event_id = EventId {
536 chain_id: admin_id,
537 stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
538 index: epoch.0,
539 };
540 let bytes = match txn_tracker.next_replayed_oracle_response()? {
541 None => self.get_event(event_id.clone()).await?,
542 Some(OracleResponse::Event(recorded_event_id, bytes))
543 if recorded_event_id == event_id =>
544 {
545 bytes
546 }
547 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
548 };
549 txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
550 }
551 UpdateStreams(streams) => {
552 let mut missing_events = Vec::new();
553 for (chain_id, stream_id, next_index) in streams {
554 let subscriptions = self
555 .event_subscriptions
556 .get_mut_or_default(&(chain_id, stream_id.clone()))
557 .await?;
558 ensure!(
559 subscriptions.next_index < next_index,
560 ExecutionError::OutdatedUpdateStreams
561 );
562 for application_id in &subscriptions.applications {
563 txn_tracker.add_stream_to_process(
564 *application_id,
565 chain_id,
566 stream_id.clone(),
567 subscriptions.next_index,
568 next_index,
569 );
570 }
571 subscriptions.next_index = next_index;
572 let index = next_index
573 .checked_sub(1)
574 .ok_or(ArithmeticError::Underflow)?;
575 let event_id = EventId {
576 chain_id,
577 stream_id,
578 index,
579 };
580 if !self
581 .context()
582 .extra()
583 .contains_event(event_id.clone())
584 .await?
585 {
586 missing_events.push(event_id)
587 }
588 }
589 ensure!(
590 missing_events.is_empty(),
591 ExecutionError::EventsNotFound(missing_events)
592 );
593 }
594 }
595
596 Ok(new_application)
597 }
598
599 fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
602 let expected = self.epoch.get().try_add_one()?;
603 ensure!(
604 provided == expected,
605 ExecutionError::InvalidCommitteeEpoch { provided, expected }
606 );
607 Ok(())
608 }
609
610 pub async fn transfer(
611 &mut self,
612 authenticated_signer: Option<AccountOwner>,
613 authenticated_application_id: Option<ApplicationId>,
614 source: AccountOwner,
615 recipient: Recipient,
616 amount: Amount,
617 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
618 if source == AccountOwner::CHAIN {
619 ensure!(
620 authenticated_signer.is_some()
621 && self
622 .ownership
623 .get()
624 .verify_owner(&authenticated_signer.unwrap()),
625 ExecutionError::UnauthenticatedTransferOwner
626 );
627 } else {
628 ensure!(
629 authenticated_signer == Some(source)
630 || authenticated_application_id.map(AccountOwner::from) == Some(source),
631 ExecutionError::UnauthenticatedTransferOwner
632 );
633 }
634 ensure!(
635 amount > Amount::ZERO,
636 ExecutionError::IncorrectTransferAmount
637 );
638 self.debit(&source, amount).await?;
639 match recipient {
640 Recipient::Account(account) => {
641 let message = SystemMessage::Credit {
642 amount,
643 source,
644 target: account.owner,
645 };
646 Ok(Some(
647 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
648 ))
649 }
650 Recipient::Burn => Ok(None),
651 }
652 }
653
654 pub async fn claim(
655 &self,
656 authenticated_signer: Option<AccountOwner>,
657 authenticated_application_id: Option<ApplicationId>,
658 source: AccountOwner,
659 target_id: ChainId,
660 recipient: Recipient,
661 amount: Amount,
662 ) -> Result<OutgoingMessage, ExecutionError> {
663 ensure!(
664 authenticated_signer == Some(source)
665 || authenticated_application_id.map(AccountOwner::from) == Some(source),
666 ExecutionError::UnauthenticatedClaimOwner
667 );
668 ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
669
670 let message = SystemMessage::Withdraw {
671 amount,
672 owner: source,
673 recipient,
674 };
675 Ok(
676 OutgoingMessage::new(target_id, message)
677 .with_authenticated_signer(authenticated_signer),
678 )
679 }
680
681 async fn debit(
683 &mut self,
684 account: &AccountOwner,
685 amount: Amount,
686 ) -> Result<(), ExecutionError> {
687 let balance = if account == &AccountOwner::CHAIN {
688 self.balance.get_mut()
689 } else {
690 self.balances.get_mut(account).await?.ok_or_else(|| {
691 ExecutionError::InsufficientBalance {
692 balance: Amount::ZERO,
693 account: *account,
694 }
695 })?
696 };
697
698 balance
699 .try_sub_assign(amount)
700 .map_err(|_| ExecutionError::InsufficientBalance {
701 balance: *balance,
702 account: *account,
703 })?;
704
705 if account != &AccountOwner::CHAIN && balance.is_zero() {
706 self.balances.remove(account)?;
707 }
708
709 Ok(())
710 }
711
712 pub async fn execute_message(
714 &mut self,
715 context: MessageContext,
716 message: SystemMessage,
717 ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
718 let mut outcome = Vec::new();
719 use SystemMessage::*;
720 match message {
721 Credit {
722 amount,
723 source,
724 target,
725 } => {
726 let receiver = if context.is_bouncing { source } else { target };
727 if receiver == AccountOwner::CHAIN {
728 let new_balance = self.balance.get().saturating_add(amount);
729 self.balance.set(new_balance);
730 } else {
731 let balance = self.balances.get_mut_or_default(&receiver).await?;
732 *balance = balance.saturating_add(amount);
733 }
734 }
735 Withdraw {
736 amount,
737 owner,
738 recipient,
739 } => {
740 self.debit(&owner, amount).await?;
741 match recipient {
742 Recipient::Account(account) => {
743 let message = SystemMessage::Credit {
744 amount,
745 source: owner,
746 target: account.owner,
747 };
748 outcome.push(
749 OutgoingMessage::new(account.chain_id, message)
750 .with_kind(MessageKind::Tracked),
751 );
752 }
753 Recipient::Burn => (),
754 }
755 }
756 ApplicationCreated => {}
758 }
759 Ok(outcome)
760 }
761
762 pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
765 if self.description.get().is_some() {
766 return Ok(true);
768 }
769 let description_blob = self
770 .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
771 .await?;
772 let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
773 let InitialChainConfig {
774 ownership,
775 epoch,
776 balance,
777 min_active_epoch,
778 max_active_epoch,
779 application_permissions,
780 } = description.config().clone();
781 self.timestamp.set(description.timestamp());
782 self.description.set(Some(description));
783 self.epoch.set(epoch);
784 let committees = self
785 .context()
786 .extra()
787 .committees_for(min_active_epoch..=max_active_epoch)
788 .await?;
789 self.committees.set(committees);
790 let admin_id = self
791 .context()
792 .extra()
793 .get_network_description()
794 .await?
795 .ok_or(ExecutionError::NoNetworkDescriptionFound)?
796 .admin_chain_id;
797 self.admin_id.set(Some(admin_id));
798 self.ownership.set(ownership);
799 self.balance.set(balance);
800 self.application_permissions.set(application_permissions);
801 Ok(false)
802 }
803
804 pub async fn handle_query(
805 &mut self,
806 context: QueryContext,
807 _query: SystemQuery,
808 ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
809 let response = SystemResponse {
810 chain_id: context.chain_id,
811 balance: *self.balance.get(),
812 };
813 Ok(QueryOutcome {
814 response,
815 operations: vec![],
816 })
817 }
818
819 pub async fn open_chain(
822 &mut self,
823 config: OpenChainConfig,
824 parent: ChainId,
825 block_height: BlockHeight,
826 timestamp: Timestamp,
827 txn_tracker: &mut TransactionTracker,
828 ) -> Result<ChainId, ExecutionError> {
829 let chain_index = txn_tracker.next_chain_index();
830 let chain_origin = ChainOrigin::Child {
831 parent,
832 block_height,
833 chain_index,
834 };
835 let init_chain_config = config.init_chain_config(
836 *self.epoch.get(),
837 self.committees
838 .get()
839 .keys()
840 .min()
841 .copied()
842 .unwrap_or(Epoch::ZERO),
843 self.committees
844 .get()
845 .keys()
846 .max()
847 .copied()
848 .unwrap_or(Epoch::ZERO),
849 );
850 let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
851 let child_id = chain_description.id();
852 self.debit(&AccountOwner::CHAIN, config.balance).await?;
853 let blob = Blob::new_chain_description(&chain_description);
854 txn_tracker.add_created_blob(blob);
855 Ok(child_id)
856 }
857
858 pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
859 self.closed.set(true);
860 Ok(())
861 }
862
863 pub async fn create_application(
864 &mut self,
865 chain_id: ChainId,
866 block_height: BlockHeight,
867 module_id: ModuleId,
868 parameters: Vec<u8>,
869 required_application_ids: Vec<ApplicationId>,
870 mut txn_tracker: TransactionTracker,
871 ) -> Result<CreateApplicationResult, ExecutionError> {
872 let application_index = txn_tracker.next_application_index();
873
874 let blob_ids = self.check_bytecode_blobs(&module_id).await?;
875 for blob_id in blob_ids {
878 self.blob_used(&mut txn_tracker, blob_id).await?;
879 }
880
881 let application_description = ApplicationDescription {
882 module_id,
883 creator_chain_id: chain_id,
884 block_height,
885 application_index,
886 parameters,
887 required_application_ids,
888 };
889 self.check_required_applications(&application_description, &mut txn_tracker)
890 .await?;
891
892 let blob = Blob::new_application_description(&application_description);
893 self.used_blobs.insert(&blob.id())?;
894 txn_tracker.add_created_blob(blob);
895
896 Ok(CreateApplicationResult {
897 app_id: ApplicationId::from(&application_description),
898 txn_tracker,
899 })
900 }
901
902 async fn check_required_applications(
903 &mut self,
904 application_description: &ApplicationDescription,
905 txn_tracker: &mut TransactionTracker,
906 ) -> Result<(), ExecutionError> {
907 for required_id in &application_description.required_application_ids {
909 Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
910 }
911 Ok(())
912 }
913
914 pub async fn describe_application(
916 &mut self,
917 id: ApplicationId,
918 txn_tracker: &mut TransactionTracker,
919 ) -> Result<ApplicationDescription, ExecutionError> {
920 let blob_id = id.description_blob_id();
921 let blob_content = match txn_tracker.created_blobs().get(&blob_id) {
922 Some(blob) => blob.content().clone(),
923 None => self.read_blob_content(blob_id).await?,
924 };
925 self.blob_used(txn_tracker, blob_id).await?;
926 let description: ApplicationDescription = bcs::from_bytes(blob_content.bytes())?;
927
928 let blob_ids = self.check_bytecode_blobs(&description.module_id).await?;
929 for blob_id in blob_ids {
932 self.blob_used(txn_tracker, blob_id).await?;
933 }
934
935 self.check_required_applications(&description, txn_tracker)
936 .await?;
937
938 Ok(description)
939 }
940
941 pub async fn find_dependencies(
943 &mut self,
944 mut stack: Vec<ApplicationId>,
945 txn_tracker: &mut TransactionTracker,
946 ) -> Result<Vec<ApplicationId>, ExecutionError> {
947 let mut result = Vec::new();
949 let mut sorted = HashSet::new();
951 let mut seen = HashSet::new();
953
954 while let Some(id) = stack.pop() {
955 if sorted.contains(&id) {
956 continue;
957 }
958 if seen.contains(&id) {
959 sorted.insert(id);
962 result.push(id);
963 continue;
964 }
965 seen.insert(id);
968 stack.push(id);
970 let app = self.describe_application(id, txn_tracker).await?;
971 for child in app.required_application_ids.iter().rev() {
972 if !seen.contains(child) {
973 stack.push(*child);
974 }
975 }
976 }
977 Ok(result)
978 }
979
980 pub(crate) async fn blob_used(
983 &mut self,
984 txn_tracker: &mut TransactionTracker,
985 blob_id: BlobId,
986 ) -> Result<bool, ExecutionError> {
987 if self.used_blobs.contains(&blob_id).await? {
988 return Ok(false); }
990 self.used_blobs.insert(&blob_id)?;
991 txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
992 Ok(true)
993 }
994
995 fn blob_published(
998 &mut self,
999 blob_id: &BlobId,
1000 txn_tracker: &mut TransactionTracker,
1001 ) -> Result<(), ExecutionError> {
1002 self.used_blobs.insert(blob_id)?;
1003 txn_tracker.add_published_blob(*blob_id);
1004 Ok(())
1005 }
1006
1007 pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1008 match self.context().extra().get_blob(blob_id).await {
1009 Ok(Some(blob)) => Ok(blob.into()),
1010 Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1011 Err(error) => Err(error.into()),
1012 }
1013 }
1014
1015 pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1016 if self.context().extra().contains_blob(blob_id).await? {
1017 Ok(())
1018 } else {
1019 Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1020 }
1021 }
1022
1023 async fn check_bytecode_blobs(
1024 &mut self,
1025 module_id: &ModuleId,
1026 ) -> Result<Vec<BlobId>, ExecutionError> {
1027 let blob_ids = module_id.bytecode_blob_ids();
1028
1029 let mut missing_blobs = Vec::new();
1030 for blob_id in &blob_ids {
1031 if !self.context().extra().contains_blob(*blob_id).await? {
1032 missing_blobs.push(*blob_id);
1033 }
1034 }
1035 ensure!(
1036 missing_blobs.is_empty(),
1037 ExecutionError::BlobsNotFound(missing_blobs)
1038 );
1039
1040 Ok(blob_ids)
1041 }
1042}