1#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10 collections::{BTreeMap, BTreeSet, HashSet},
11 mem,
12};
13
14use custom_debug_derive::Debug;
15use linera_base::{
16 crypto::CryptoHash,
17 data_types::{
18 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
19 ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
20 },
21 ensure, hex_debug,
22 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
23 ownership::{ChainOwnership, TimeoutConfig},
24};
25use linera_views::{
26 context::Context,
27 map_view::{HashedMapView, MapView},
28 register_view::HashedRegisterView,
29 set_view::HashedSetView,
30 views::{ClonableView, HashableView, View},
31};
32use serde::{Deserialize, Serialize};
33
34#[cfg(test)]
35use crate::test_utils::SystemExecutionState;
36use crate::{
37 committee::Committee, ApplicationDescription, ApplicationId, ExecutionError,
38 ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext, OutgoingMessage,
39 QueryContext, QueryOutcome, ResourceController, TransactionTracker,
40};
41
42pub static EPOCH_STREAM_NAME: &[u8] = &[0];
44pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
46
47#[cfg(with_metrics)]
49mod metrics {
50 use std::sync::LazyLock;
51
52 use linera_base::prometheus_util::register_int_counter_vec;
53 use prometheus::IntCounterVec;
54
55 pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
56 register_int_counter_vec(
57 "open_chain_count",
58 "The number of times the `OpenChain` operation was executed",
59 &[],
60 )
61 });
62}
63
64#[derive(Debug, ClonableView, HashableView)]
66pub struct SystemExecutionStateView<C> {
67 pub description: HashedRegisterView<C, Option<ChainDescription>>,
69 pub epoch: HashedRegisterView<C, Epoch>,
71 pub admin_id: HashedRegisterView<C, Option<ChainId>>,
73 pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
78 pub ownership: HashedRegisterView<C, ChainOwnership>,
80 pub balance: HashedRegisterView<C, Amount>,
82 pub balances: HashedMapView<C, AccountOwner, Amount>,
84 pub timestamp: HashedRegisterView<C, Timestamp>,
86 pub closed: HashedRegisterView<C, bool>,
88 pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
90 pub used_blobs: HashedSetView<C, BlobId>,
92 pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
94}
95
96#[derive(Debug, Default, Clone, Serialize, Deserialize)]
98pub struct EventSubscriptions {
99 pub next_index: u32,
102 pub applications: BTreeSet<ApplicationId>,
104}
105
106#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
108pub struct OpenChainConfig {
109 pub ownership: ChainOwnership,
111 pub balance: Amount,
113 pub application_permissions: ApplicationPermissions,
115}
116
117impl OpenChainConfig {
118 pub fn init_chain_config(
121 &self,
122 epoch: Epoch,
123 committees: BTreeMap<Epoch, Vec<u8>>,
124 ) -> InitialChainConfig {
125 InitialChainConfig {
126 application_permissions: self.application_permissions.clone(),
127 balance: self.balance,
128 committees,
129 epoch,
130 ownership: self.ownership.clone(),
131 }
132 }
133}
134
135#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
137pub enum SystemOperation {
138 Transfer {
141 owner: AccountOwner,
142 recipient: Recipient,
143 amount: Amount,
144 },
145 Claim {
149 owner: AccountOwner,
150 target_id: ChainId,
151 recipient: Recipient,
152 amount: Amount,
153 },
154 OpenChain(OpenChainConfig),
157 CloseChain,
159 ChangeOwnership {
161 #[debug(skip_if = Vec::is_empty)]
163 super_owners: Vec<AccountOwner>,
164 #[debug(skip_if = Vec::is_empty)]
166 owners: Vec<(AccountOwner, u64)>,
167 multi_leader_rounds: u32,
169 open_multi_leader_rounds: bool,
173 timeout_config: TimeoutConfig,
175 },
176 ChangeApplicationPermissions(ApplicationPermissions),
178 PublishModule { module_id: ModuleId },
180 PublishDataBlob { blob_hash: CryptoHash },
182 ReadBlob { blob_id: BlobId },
185 CreateApplication {
187 module_id: ModuleId,
188 #[serde(with = "serde_bytes")]
189 #[debug(with = "hex_debug")]
190 parameters: Vec<u8>,
191 #[serde(with = "serde_bytes")]
192 #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
193 instantiation_argument: Vec<u8>,
194 #[debug(skip_if = Vec::is_empty)]
195 required_application_ids: Vec<ApplicationId>,
196 },
197 Admin(AdminOperation),
199 ProcessNewEpoch(Epoch),
201 ProcessRemovedEpoch(Epoch),
203 UpdateStreams(Vec<(ChainId, StreamId, u32)>),
205}
206
207#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
209pub enum AdminOperation {
210 PublishCommitteeBlob { blob_hash: CryptoHash },
213 CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
216 RemoveCommittee { epoch: Epoch },
220}
221
222#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
224pub enum SystemMessage {
225 Credit {
228 target: AccountOwner,
229 amount: Amount,
230 source: AccountOwner,
231 },
232 Withdraw {
236 owner: AccountOwner,
237 amount: Amount,
238 recipient: Recipient,
239 },
240 ApplicationCreated,
242}
243
244#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
246pub struct SystemQuery;
247
248#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
250pub struct SystemResponse {
251 pub chain_id: ChainId,
252 pub balance: Amount,
253}
254
255#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
257pub enum Recipient {
258 Burn,
260 Account(Account),
262}
263
264impl Recipient {
265 pub fn chain(chain_id: ChainId) -> Recipient {
267 Recipient::Account(Account::chain(chain_id))
268 }
269}
270
271impl From<ChainId> for Recipient {
272 fn from(chain_id: ChainId) -> Self {
273 Recipient::chain(chain_id)
274 }
275}
276
277impl From<Account> for Recipient {
278 fn from(account: Account) -> Self {
279 Recipient::Account(account)
280 }
281}
282
283#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
285pub struct UserData(pub Option<[u8; 32]>);
286
287impl UserData {
288 pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
289 let option_array = match opt_str {
291 Some(s) => {
292 let vec = s.into_bytes();
294 if vec.len() <= 32 {
295 let mut array = [b' '; 32];
297
298 let len = vec.len().min(32);
300 array[..len].copy_from_slice(&vec[..len]);
301
302 Some(array)
303 } else {
304 return Err(vec.len());
305 }
306 }
307 None => None,
308 };
309
310 Ok(UserData(option_array))
312 }
313}
314
315#[derive(Debug)]
316pub struct CreateApplicationResult {
317 pub app_id: ApplicationId,
318 pub txn_tracker: TransactionTracker,
319}
320
321impl<C> SystemExecutionStateView<C>
322where
323 C: Context + Clone + Send + Sync + 'static,
324 C::Extra: ExecutionRuntimeContext,
325{
326 pub fn is_active(&self) -> bool {
328 self.description.get().is_some()
329 && self.ownership.get().is_active()
330 && self.current_committee().is_some()
331 && self.admin_id.get().is_some()
332 }
333
334 pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
336 let epoch = self.epoch.get();
337 let committee = self.committees.get().get(epoch)?;
338 Some((*epoch, committee))
339 }
340
341 pub fn get_committees(&self) -> BTreeMap<Epoch, Vec<u8>> {
343 self.committees
344 .get()
345 .iter()
346 .map(|(epoch, committee)| {
347 let serialized_committee =
348 bcs::to_bytes(committee).expect("Serializing a committee should not fail!");
349 (*epoch, serialized_committee)
350 })
351 .collect()
352 }
353
354 async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
355 match self.context().extra().get_event(event_id.clone()).await? {
356 None => Err(ExecutionError::EventsNotFound(vec![event_id])),
357 Some(vec) => Ok(vec),
358 }
359 }
360
361 pub async fn execute_operation(
364 &mut self,
365 context: OperationContext,
366 operation: SystemOperation,
367 txn_tracker: &mut TransactionTracker,
368 resource_controller: &mut ResourceController<Option<AccountOwner>>,
369 ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
370 use SystemOperation::*;
371 let mut new_application = None;
372 match operation {
373 OpenChain(config) => {
374 let _chain_id = self
375 .open_chain(
376 config,
377 context.chain_id,
378 context.height,
379 context.timestamp,
380 txn_tracker,
381 )
382 .await?;
383 #[cfg(with_metrics)]
384 metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
385 }
386 ChangeOwnership {
387 super_owners,
388 owners,
389 multi_leader_rounds,
390 open_multi_leader_rounds,
391 timeout_config,
392 } => {
393 self.ownership.set(ChainOwnership {
394 super_owners: super_owners.into_iter().collect(),
395 owners: owners.into_iter().collect(),
396 multi_leader_rounds,
397 open_multi_leader_rounds,
398 timeout_config,
399 });
400 }
401 ChangeApplicationPermissions(application_permissions) => {
402 self.application_permissions.set(application_permissions);
403 }
404 CloseChain => self.close_chain().await?,
405 Transfer {
406 owner,
407 amount,
408 recipient,
409 } => {
410 let maybe_message = self
411 .transfer(context.authenticated_signer, None, owner, recipient, amount)
412 .await?;
413 txn_tracker.add_outgoing_messages(maybe_message)?;
414 }
415 Claim {
416 owner,
417 target_id,
418 recipient,
419 amount,
420 } => {
421 let message = self
422 .claim(
423 context.authenticated_signer,
424 None,
425 owner,
426 target_id,
427 recipient,
428 amount,
429 )
430 .await?;
431 txn_tracker.add_outgoing_message(message)?;
432 }
433 Admin(admin_operation) => {
434 ensure!(
435 *self.admin_id.get() == Some(context.chain_id),
436 ExecutionError::AdminOperationOnNonAdminChain
437 );
438 match admin_operation {
439 AdminOperation::PublishCommitteeBlob { blob_hash } => {
440 self.blob_published(
441 &BlobId::new(blob_hash, BlobType::Committee),
442 txn_tracker,
443 )?;
444 }
445 AdminOperation::CreateCommittee { epoch, blob_hash } => {
446 self.check_next_epoch(epoch)?;
447 let blob_id = BlobId::new(blob_hash, BlobType::Committee);
448 let committee =
449 bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
450 self.blob_used(txn_tracker, blob_id).await?;
451 self.committees.get_mut().insert(epoch, committee);
452 self.epoch.set(epoch);
453 txn_tracker.add_event(
454 StreamId::system(EPOCH_STREAM_NAME),
455 epoch.0,
456 bcs::to_bytes(&blob_hash)?,
457 );
458 }
459 AdminOperation::RemoveCommittee { epoch } => {
460 ensure!(
461 self.committees.get_mut().remove(&epoch).is_some(),
462 ExecutionError::InvalidCommitteeRemoval
463 );
464 txn_tracker.add_event(
465 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
466 epoch.0,
467 vec![],
468 );
469 }
470 }
471 }
472 PublishModule { module_id } => {
473 for blob_id in module_id.bytecode_blob_ids() {
474 self.blob_published(&blob_id, txn_tracker)?;
475 }
476 }
477 CreateApplication {
478 module_id,
479 parameters,
480 instantiation_argument,
481 required_application_ids,
482 } => {
483 let txn_tracker_moved = mem::take(txn_tracker);
484 let CreateApplicationResult {
485 app_id,
486 txn_tracker: txn_tracker_moved,
487 } = self
488 .create_application(
489 context.chain_id,
490 context.height,
491 module_id,
492 parameters,
493 required_application_ids,
494 txn_tracker_moved,
495 )
496 .await?;
497 *txn_tracker = txn_tracker_moved;
498 new_application = Some((app_id, instantiation_argument));
499 }
500 PublishDataBlob { blob_hash } => {
501 self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
502 }
503 ReadBlob { blob_id } => {
504 let content = self.read_blob_content(blob_id).await?;
505 if blob_id.blob_type == BlobType::Data {
506 resource_controller
507 .with_state(self)
508 .await?
509 .track_blob_read(content.bytes().len() as u64)?;
510 }
511 self.blob_used(txn_tracker, blob_id).await?;
512 }
513 ProcessNewEpoch(epoch) => {
514 self.check_next_epoch(epoch)?;
515 let admin_id = self
516 .admin_id
517 .get()
518 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
519 let event_id = EventId {
520 chain_id: admin_id,
521 stream_id: StreamId::system(EPOCH_STREAM_NAME),
522 index: epoch.0,
523 };
524 let bytes = match txn_tracker.next_replayed_oracle_response()? {
525 None => self.get_event(event_id.clone()).await?,
526 Some(OracleResponse::Event(recorded_event_id, bytes))
527 if recorded_event_id == event_id =>
528 {
529 bytes
530 }
531 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
532 };
533 let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
534 txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
535 let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
536 self.blob_used(txn_tracker, blob_id).await?;
537 self.committees.get_mut().insert(epoch, committee);
538 self.epoch.set(epoch);
539 }
540 ProcessRemovedEpoch(epoch) => {
541 ensure!(
542 self.committees.get_mut().remove(&epoch).is_some(),
543 ExecutionError::InvalidCommitteeRemoval
544 );
545 let admin_id = self
546 .admin_id
547 .get()
548 .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
549 let event_id = EventId {
550 chain_id: admin_id,
551 stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
552 index: epoch.0,
553 };
554 let bytes = match txn_tracker.next_replayed_oracle_response()? {
555 None => self.get_event(event_id.clone()).await?,
556 Some(OracleResponse::Event(recorded_event_id, bytes))
557 if recorded_event_id == event_id =>
558 {
559 bytes
560 }
561 Some(_) => return Err(ExecutionError::OracleResponseMismatch),
562 };
563 txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
564 }
565 UpdateStreams(streams) => {
566 for (chain_id, stream_id, next_index) in streams {
567 let subscriptions = self
568 .event_subscriptions
569 .get_mut_or_default(&(chain_id, stream_id.clone()))
570 .await?;
571 ensure!(
572 subscriptions.next_index < next_index,
573 ExecutionError::OutdatedUpdateStreams
574 );
575 for application_id in &subscriptions.applications {
576 txn_tracker.add_stream_to_process(
577 *application_id,
578 chain_id,
579 stream_id.clone(),
580 subscriptions.next_index,
581 next_index,
582 );
583 }
584 subscriptions.next_index = next_index;
585 let index = next_index
586 .checked_sub(1)
587 .ok_or(ArithmeticError::Underflow)?;
588 let event_id = EventId {
589 chain_id,
590 stream_id,
591 index,
592 };
593 ensure!(
594 self.context()
595 .extra()
596 .contains_event(event_id.clone())
597 .await?,
598 ExecutionError::EventNotFound(event_id)
599 );
600 }
601 }
602 }
603
604 Ok(new_application)
605 }
606
607 fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
610 let expected = self.epoch.get().try_add_one()?;
611 ensure!(
612 provided == expected,
613 ExecutionError::InvalidCommitteeEpoch { provided, expected }
614 );
615 Ok(())
616 }
617
618 pub async fn transfer(
619 &mut self,
620 authenticated_signer: Option<AccountOwner>,
621 authenticated_application_id: Option<ApplicationId>,
622 source: AccountOwner,
623 recipient: Recipient,
624 amount: Amount,
625 ) -> Result<Option<OutgoingMessage>, ExecutionError> {
626 if source == AccountOwner::CHAIN {
627 ensure!(
628 authenticated_signer.is_some()
629 && self
630 .ownership
631 .get()
632 .verify_owner(&authenticated_signer.unwrap()),
633 ExecutionError::UnauthenticatedTransferOwner
634 );
635 } else {
636 ensure!(
637 authenticated_signer == Some(source)
638 || authenticated_application_id.map(AccountOwner::from) == Some(source),
639 ExecutionError::UnauthenticatedTransferOwner
640 );
641 }
642 ensure!(
643 amount > Amount::ZERO,
644 ExecutionError::IncorrectTransferAmount
645 );
646 self.debit(&source, amount).await?;
647 match recipient {
648 Recipient::Account(account) => {
649 let message = SystemMessage::Credit {
650 amount,
651 source,
652 target: account.owner,
653 };
654 Ok(Some(
655 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
656 ))
657 }
658 Recipient::Burn => Ok(None),
659 }
660 }
661
662 pub async fn claim(
663 &self,
664 authenticated_signer: Option<AccountOwner>,
665 authenticated_application_id: Option<ApplicationId>,
666 source: AccountOwner,
667 target_id: ChainId,
668 recipient: Recipient,
669 amount: Amount,
670 ) -> Result<OutgoingMessage, ExecutionError> {
671 ensure!(
672 authenticated_signer == Some(source)
673 || authenticated_application_id.map(AccountOwner::from) == Some(source),
674 ExecutionError::UnauthenticatedClaimOwner
675 );
676 ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
677
678 let message = SystemMessage::Withdraw {
679 amount,
680 owner: source,
681 recipient,
682 };
683 Ok(
684 OutgoingMessage::new(target_id, message)
685 .with_authenticated_signer(authenticated_signer),
686 )
687 }
688
689 async fn debit(
691 &mut self,
692 account: &AccountOwner,
693 amount: Amount,
694 ) -> Result<(), ExecutionError> {
695 let balance = if account == &AccountOwner::CHAIN {
696 self.balance.get_mut()
697 } else {
698 self.balances.get_mut(account).await?.ok_or_else(|| {
699 ExecutionError::InsufficientBalance {
700 balance: Amount::ZERO,
701 account: *account,
702 }
703 })?
704 };
705
706 balance
707 .try_sub_assign(amount)
708 .map_err(|_| ExecutionError::InsufficientBalance {
709 balance: *balance,
710 account: *account,
711 })?;
712
713 if account != &AccountOwner::CHAIN && balance.is_zero() {
714 self.balances.remove(account)?;
715 }
716
717 Ok(())
718 }
719
720 pub async fn execute_message(
722 &mut self,
723 context: MessageContext,
724 message: SystemMessage,
725 ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
726 let mut outcome = Vec::new();
727 use SystemMessage::*;
728 match message {
729 Credit {
730 amount,
731 source,
732 target,
733 } => {
734 let receiver = if context.is_bouncing { source } else { target };
735 if receiver == AccountOwner::CHAIN {
736 let new_balance = self.balance.get().saturating_add(amount);
737 self.balance.set(new_balance);
738 } else {
739 let balance = self.balances.get_mut_or_default(&receiver).await?;
740 *balance = balance.saturating_add(amount);
741 }
742 }
743 Withdraw {
744 amount,
745 owner,
746 recipient,
747 } => {
748 self.debit(&owner, amount).await?;
749 match recipient {
750 Recipient::Account(account) => {
751 let message = SystemMessage::Credit {
752 amount,
753 source: owner,
754 target: account.owner,
755 };
756 outcome.push(
757 OutgoingMessage::new(account.chain_id, message)
758 .with_kind(MessageKind::Tracked),
759 );
760 }
761 Recipient::Burn => (),
762 }
763 }
764 ApplicationCreated => {}
766 }
767 Ok(outcome)
768 }
769
770 pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
773 if self.description.get().is_some() {
774 return Ok(true);
776 }
777 let description_blob = self
778 .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
779 .await?;
780 let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
781 let InitialChainConfig {
782 ownership,
783 epoch,
784 committees,
785 balance,
786 application_permissions,
787 } = description.config().clone();
788 self.timestamp.set(description.timestamp());
789 self.description.set(Some(description));
790 self.epoch.set(epoch);
791 let committees = committees
792 .into_iter()
793 .map(|(epoch, serialized_committee)| {
794 let committee = bcs::from_bytes(&serialized_committee)
795 .expect("Deserializing a committee shouldn't fail");
796 (epoch, committee)
797 })
798 .collect();
799 self.committees.set(committees);
800 let admin_id = self
802 .context()
803 .extra()
804 .get_network_description()
805 .await?
806 .ok_or(ExecutionError::NoNetworkDescriptionFound)?
807 .admin_chain_id;
808 self.admin_id.set(Some(admin_id));
809 self.ownership.set(ownership);
810 self.balance.set(balance);
811 self.application_permissions.set(application_permissions);
812 Ok(false)
813 }
814
815 pub async fn handle_query(
816 &mut self,
817 context: QueryContext,
818 _query: SystemQuery,
819 ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
820 let response = SystemResponse {
821 chain_id: context.chain_id,
822 balance: *self.balance.get(),
823 };
824 Ok(QueryOutcome {
825 response,
826 operations: vec![],
827 })
828 }
829
830 pub async fn open_chain(
833 &mut self,
834 config: OpenChainConfig,
835 parent: ChainId,
836 block_height: BlockHeight,
837 timestamp: Timestamp,
838 txn_tracker: &mut TransactionTracker,
839 ) -> Result<ChainId, ExecutionError> {
840 let chain_index = txn_tracker.next_chain_index();
841 let chain_origin = ChainOrigin::Child {
842 parent,
843 block_height,
844 chain_index,
845 };
846 let committees = self.get_committees();
847 let init_chain_config = config.init_chain_config(*self.epoch.get(), committees);
848 let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
849 let child_id = chain_description.id();
850 self.debit(&AccountOwner::CHAIN, config.balance).await?;
851 let blob = Blob::new_chain_description(&chain_description);
852 txn_tracker.add_created_blob(blob);
853 Ok(child_id)
854 }
855
856 pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
857 self.closed.set(true);
858 Ok(())
859 }
860
861 pub async fn create_application(
862 &mut self,
863 chain_id: ChainId,
864 block_height: BlockHeight,
865 module_id: ModuleId,
866 parameters: Vec<u8>,
867 required_application_ids: Vec<ApplicationId>,
868 mut txn_tracker: TransactionTracker,
869 ) -> Result<CreateApplicationResult, ExecutionError> {
870 let application_index = txn_tracker.next_application_index();
871
872 let blob_ids = self.check_bytecode_blobs(&module_id).await?;
873 for blob_id in blob_ids {
876 self.blob_used(&mut txn_tracker, blob_id).await?;
877 }
878
879 let application_description = ApplicationDescription {
880 module_id,
881 creator_chain_id: chain_id,
882 block_height,
883 application_index,
884 parameters,
885 required_application_ids,
886 };
887 self.check_required_applications(&application_description, &mut txn_tracker)
888 .await?;
889
890 let blob = Blob::new_application_description(&application_description);
891 self.used_blobs.insert(&blob.id())?;
892 txn_tracker.add_created_blob(blob);
893
894 Ok(CreateApplicationResult {
895 app_id: ApplicationId::from(&application_description),
896 txn_tracker,
897 })
898 }
899
900 async fn check_required_applications(
901 &mut self,
902 application_description: &ApplicationDescription,
903 txn_tracker: &mut TransactionTracker,
904 ) -> Result<(), ExecutionError> {
905 for required_id in &application_description.required_application_ids {
907 Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
908 }
909 Ok(())
910 }
911
912 pub async fn describe_application(
914 &mut self,
915 id: ApplicationId,
916 txn_tracker: &mut TransactionTracker,
917 ) -> Result<ApplicationDescription, ExecutionError> {
918 let blob_id = id.description_blob_id();
919 let blob_content = match txn_tracker.created_blobs().get(&blob_id) {
920 Some(blob) => blob.content().clone(),
921 None => self.read_blob_content(blob_id).await?,
922 };
923 self.blob_used(txn_tracker, blob_id).await?;
924 let description: ApplicationDescription = bcs::from_bytes(blob_content.bytes())?;
925
926 let blob_ids = self.check_bytecode_blobs(&description.module_id).await?;
927 for blob_id in blob_ids {
930 self.blob_used(txn_tracker, blob_id).await?;
931 }
932
933 self.check_required_applications(&description, txn_tracker)
934 .await?;
935
936 Ok(description)
937 }
938
939 pub async fn find_dependencies(
941 &mut self,
942 mut stack: Vec<ApplicationId>,
943 txn_tracker: &mut TransactionTracker,
944 ) -> Result<Vec<ApplicationId>, ExecutionError> {
945 let mut result = Vec::new();
947 let mut sorted = HashSet::new();
949 let mut seen = HashSet::new();
951
952 while let Some(id) = stack.pop() {
953 if sorted.contains(&id) {
954 continue;
955 }
956 if seen.contains(&id) {
957 sorted.insert(id);
960 result.push(id);
961 continue;
962 }
963 seen.insert(id);
966 stack.push(id);
968 let app = self.describe_application(id, txn_tracker).await?;
969 for child in app.required_application_ids.iter().rev() {
970 if !seen.contains(child) {
971 stack.push(*child);
972 }
973 }
974 }
975 Ok(result)
976 }
977
978 pub(crate) async fn blob_used(
981 &mut self,
982 txn_tracker: &mut TransactionTracker,
983 blob_id: BlobId,
984 ) -> Result<bool, ExecutionError> {
985 if self.used_blobs.contains(&blob_id).await? {
986 return Ok(false); }
988 self.used_blobs.insert(&blob_id)?;
989 txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
990 Ok(true)
991 }
992
993 fn blob_published(
996 &mut self,
997 blob_id: &BlobId,
998 txn_tracker: &mut TransactionTracker,
999 ) -> Result<(), ExecutionError> {
1000 self.used_blobs.insert(blob_id)?;
1001 txn_tracker.add_published_blob(*blob_id);
1002 Ok(())
1003 }
1004
1005 pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1006 match self.context().extra().get_blob(blob_id).await {
1007 Ok(Some(blob)) => Ok(blob.into()),
1008 Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1009 Err(error) => Err(error.into()),
1010 }
1011 }
1012
1013 pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1014 if self.context().extra().contains_blob(blob_id).await? {
1015 Ok(())
1016 } else {
1017 Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1018 }
1019 }
1020
1021 async fn check_bytecode_blobs(
1022 &mut self,
1023 module_id: &ModuleId,
1024 ) -> Result<Vec<BlobId>, ExecutionError> {
1025 let blob_ids = module_id.bytecode_blob_ids();
1026
1027 let mut missing_blobs = Vec::new();
1028 for blob_id in &blob_ids {
1029 if !self.context().extra().contains_blob(*blob_id).await? {
1030 missing_blobs.push(*blob_id);
1031 }
1032 }
1033 ensure!(
1034 missing_blobs.is_empty(),
1035 ExecutionError::BlobsNotFound(missing_blobs)
1036 );
1037
1038 Ok(blob_ids)
1039 }
1040}