1use std::{
7 collections::{BTreeMap, BTreeSet},
8 sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16 data_types::{
17 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18 Timestamp,
19 },
20 ensure, hex_debug, hex_vec_debug, http,
21 identifiers::{
22 Account, AccountOwner, BlobId, BlobType, ChainId, EventId, OwnerSpender, StreamId,
23 },
24 ownership::ChainOwnership,
25 time::Instant,
26};
27use linera_views::{batch::Batch, context::Context, views::View};
28use oneshot::Sender;
29use reqwest::{header::HeaderMap, Client, Url};
30use tracing::{info_span, instrument, Instrument as _};
31
32use crate::{
33 execution::UserAction,
34 runtime::ContractSyncRuntime,
35 system::{CreateApplicationResult, OpenChainConfig},
36 util::{OracleResponseExt as _, RespondExt as _},
37 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
38 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
39 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
40 ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
41};
42
43pub struct ExecutionStateActor<'a, C> {
45 state: &'a mut ExecutionStateView<C>,
46 txn_tracker: &'a mut TransactionTracker,
47 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
48}
49
50#[cfg(with_metrics)]
51mod metrics {
52 use std::sync::LazyLock;
53
54 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
55 use prometheus::HistogramVec;
56
57 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
59 register_histogram_vec(
60 "load_contract_latency",
61 "Load contract latency",
62 &[],
63 exponential_bucket_latencies(250.0),
64 )
65 });
66
67 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
69 register_histogram_vec(
70 "load_service_latency",
71 "Load service latency",
72 &[],
73 exponential_bucket_latencies(250.0),
74 )
75 });
76}
77
78pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
79
80impl<'a, C> ExecutionStateActor<'a, C>
81where
82 C: Context + Clone + 'static,
83 C::Extra: ExecutionRuntimeContext,
84{
85 pub fn new(
87 state: &'a mut ExecutionStateView<C>,
88 txn_tracker: &'a mut TransactionTracker,
89 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
90 ) -> Self {
91 Self {
92 state,
93 txn_tracker,
94 resource_controller,
95 }
96 }
97
98 #[instrument(skip_all, fields(application_id = %id))]
99 pub(crate) async fn load_contract(
100 &mut self,
101 id: ApplicationId,
102 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
103 #[cfg(with_metrics)]
104 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
105 let blob_id = id.description_blob_id();
106 let description = match self.txn_tracker.get_blob_content(&blob_id) {
107 Some(blob) => bcs::from_bytes(blob.bytes())?,
108 None => {
109 self.state
110 .system
111 .describe_application(id, self.txn_tracker)
112 .await?
113 }
114 };
115 let code = self
116 .state
117 .context()
118 .extra()
119 .get_user_contract(&description, self.txn_tracker)
120 .await?;
121 Ok((code, description))
122 }
123
124 pub(crate) async fn load_service(
125 &mut self,
126 id: ApplicationId,
127 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
128 #[cfg(with_metrics)]
129 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
130 let blob_id = id.description_blob_id();
131 let description = match self.txn_tracker.get_blob_content(&blob_id) {
132 Some(blob) => bcs::from_bytes(blob.bytes())?,
133 None => {
134 self.state
135 .system
136 .describe_application(id, self.txn_tracker)
137 .await?
138 }
139 };
140 let code = self
141 .state
142 .context()
143 .extra()
144 .get_user_service(&description, self.txn_tracker)
145 .await?;
146 Ok((code, description))
147 }
148
149 #[instrument(
151 skip_all,
152 fields(request_type = %request.as_ref())
153 )]
154 pub(crate) async fn handle_request(
155 &mut self,
156 request: ExecutionRequest,
157 ) -> Result<(), ExecutionError> {
158 use ExecutionRequest::*;
159 match request {
160 #[cfg(not(web))]
161 LoadContract { id, callback } => {
162 let (code, description) = self.load_contract(id).await?;
163 callback.respond((code, description))
164 }
165 #[cfg(not(web))]
166 LoadService { id, callback } => {
167 let (code, description) = self.load_service(id).await?;
168 callback.respond((code, description))
169 }
170
171 ChainBalance { callback } => {
172 let balance = *self.state.system.balance.get();
173 callback.respond(balance);
174 }
175
176 OwnerBalance { owner, callback } => {
177 let balance = self
178 .state
179 .system
180 .balances
181 .get(&owner)
182 .await?
183 .unwrap_or_default();
184 callback.respond(balance);
185 }
186
187 OwnerBalances { callback } => {
188 callback.respond(self.state.system.balances.index_values().await?);
189 }
190
191 BalanceOwners { callback } => {
192 let owners = self.state.system.balances.indices().await?;
193 callback.respond(owners);
194 }
195
196 Allowance {
197 owner,
198 spender,
199 callback,
200 } => {
201 let owner_spender = OwnerSpender::new(owner, spender);
202 let allowance = self
203 .state
204 .system
205 .allowances
206 .get(&owner_spender)
207 .await?
208 .unwrap_or_default();
209 callback.respond(allowance);
210 }
211
212 Allowances { callback } => {
213 let entries: Vec<_> = self
214 .state
215 .system
216 .allowances
217 .index_values()
218 .await?
219 .into_iter()
220 .map(|(os, amount)| (os.owner, os.spender, amount))
221 .collect();
222 callback.respond(entries);
223 }
224
225 Transfer {
226 source,
227 destination,
228 amount,
229 signer,
230 application_id,
231 callback,
232 } => {
233 let maybe_message = self
234 .state
235 .system
236 .transfer(signer, Some(application_id), source, destination, amount)
237 .await?;
238 self.txn_tracker.add_outgoing_messages(maybe_message);
239 callback.respond(());
240 }
241
242 Claim {
243 source,
244 destination,
245 amount,
246 signer,
247 application_id,
248 callback,
249 } => {
250 let maybe_message = self
251 .state
252 .system
253 .claim(
254 signer,
255 Some(application_id),
256 source.owner,
257 source.chain_id,
258 destination,
259 amount,
260 )
261 .await?;
262 self.txn_tracker.add_outgoing_messages(maybe_message);
263 callback.respond(());
264 }
265
266 Approve {
267 owner,
268 spender,
269 amount,
270 signer,
271 application_id,
272 callback,
273 } => {
274 self.state
275 .system
276 .approve(signer, Some(application_id), owner, spender, amount)
277 .await?;
278 callback.respond(());
279 }
280
281 TransferFrom {
282 owner,
283 spender,
284 destination,
285 amount,
286 signer,
287 application_id,
288 callback,
289 } => {
290 let maybe_message = self
291 .state
292 .system
293 .transfer_from(
294 signer,
295 Some(application_id),
296 owner,
297 spender,
298 destination,
299 amount,
300 )
301 .await?;
302 self.txn_tracker.add_outgoing_messages(maybe_message);
303 callback.respond(());
304 }
305
306 SystemTimestamp { callback } => {
307 let timestamp = *self.state.system.timestamp.get();
308 callback.respond(timestamp);
309 }
310
311 ChainOwnership { callback } => {
312 let ownership = self.state.system.ownership.get().await?.clone();
313 callback.respond(ownership);
314 }
315
316 ApplicationPermissions { callback } => {
317 let permissions = self
318 .state
319 .system
320 .application_permissions
321 .get()
322 .await?
323 .clone();
324 callback.respond(permissions);
325 }
326
327 ReadApplicationDescription {
328 application_id,
329 callback,
330 } => {
331 let blob_id = application_id.description_blob_id();
332 let description = match self.txn_tracker.get_blob_content(&blob_id) {
333 Some(blob) => bcs::from_bytes(blob.bytes())?,
334 None => {
335 let blob_content = self.state.system.read_blob_content(blob_id).await?;
336 self.state
337 .system
338 .blob_used(self.txn_tracker, blob_id)
339 .await?;
340 bcs::from_bytes(blob_content.bytes())?
341 }
342 };
343 callback.respond(description);
344 }
345
346 ContainsKey { id, key, callback } => {
347 let view = self.state.users.try_load_entry(&id).await?;
348 let result = match view {
349 Some(view) => view.contains_key(&key).await?,
350 None => false,
351 };
352 callback.respond(result);
353 }
354
355 ContainsKeys { id, keys, callback } => {
356 let view = self.state.users.try_load_entry(&id).await?;
357 let result = match view {
358 Some(view) => view.contains_keys(&keys).await?,
359 None => vec![false; keys.len()],
360 };
361 callback.respond(result);
362 }
363
364 ReadMultiValuesBytes { id, keys, callback } => {
365 let view = self.state.users.try_load_entry(&id).await?;
366 let values = match view {
367 Some(view) => view.multi_get(&keys).await?,
368 None => vec![None; keys.len()],
369 };
370 callback.respond(values);
371 }
372
373 ReadValueBytes { id, key, callback } => {
374 let view = self.state.users.try_load_entry(&id).await?;
375 let result = match view {
376 Some(view) => view.get(&key).await?,
377 None => None,
378 };
379 callback.respond(result);
380 }
381
382 FindKeysByPrefix {
383 id,
384 key_prefix,
385 callback,
386 } => {
387 let view = self.state.users.try_load_entry(&id).await?;
388 let result = match view {
389 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
390 None => Vec::new(),
391 };
392 callback.respond(result);
393 }
394
395 FindKeyValuesByPrefix {
396 id,
397 key_prefix,
398 callback,
399 } => {
400 let view = self.state.users.try_load_entry(&id).await?;
401 let result = match view {
402 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
403 None => Vec::new(),
404 };
405 callback.respond(result);
406 }
407
408 WriteBatch {
409 id,
410 batch,
411 callback,
412 } => {
413 let mut view = self.state.users.try_load_entry_mut(&id).await?;
414 view.write_batch(batch).await?;
415 callback.respond(());
416 }
417
418 OpenChain {
419 ownership,
420 balance,
421 parent_id,
422 block_height,
423 application_permissions,
424 timestamp,
425 callback,
426 } => {
427 let config = OpenChainConfig {
428 ownership,
429 balance,
430 application_permissions,
431 };
432 let chain_id = self
433 .state
434 .system
435 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
436 .await?;
437 callback.respond(chain_id);
438 }
439
440 CloseChain {
441 application_id,
442 callback,
443 } => {
444 let app_permissions = self.state.system.application_permissions.get().await?;
445 if !app_permissions.can_manage_chain(&application_id) {
446 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
447 } else {
448 self.state.system.close_chain();
449 callback.respond(Ok(()));
450 }
451 }
452
453 ChangeOwnership {
454 application_id,
455 ownership,
456 callback,
457 } => {
458 let app_permissions = self.state.system.application_permissions.get().await?;
459 if !app_permissions.can_manage_chain(&application_id) {
460 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
461 } else {
462 self.state.system.ownership.set(ownership);
463 callback.respond(Ok(()));
464 }
465 }
466
467 ChangeApplicationPermissions {
468 application_id,
469 application_permissions,
470 callback,
471 } => {
472 let app_permissions = self.state.system.application_permissions.get().await?;
473 if !app_permissions.can_manage_chain(&application_id) {
474 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
475 } else {
476 self.state
477 .system
478 .application_permissions
479 .set(application_permissions);
480 callback.respond(Ok(()));
481 }
482 }
483
484 PeekApplicationIndex { callback } => {
485 let index = self.txn_tracker.peek_application_index();
486 callback.respond(index)
487 }
488
489 CreateApplication {
490 chain_id,
491 block_height,
492 module_id,
493 parameters,
494 required_application_ids,
495 callback,
496 } => {
497 let create_application_result = self
498 .state
499 .system
500 .create_application(
501 chain_id,
502 block_height,
503 module_id,
504 parameters,
505 required_application_ids,
506 self.txn_tracker,
507 )
508 .await?;
509 callback.respond(create_application_result);
510 }
511
512 PerformHttpRequest {
513 request,
514 http_responses_are_oracle_responses,
515 callback,
516 } => {
517 let system = &mut self.state.system;
518 let response = self
519 .txn_tracker
520 .oracle(|| async {
521 let headers = request
522 .headers
523 .into_iter()
524 .map(|http::Header { name, value }| {
525 Ok((name.parse()?, value.try_into()?))
526 })
527 .collect::<Result<HeaderMap, ExecutionError>>()?;
528
529 let url = Url::parse(&request.url)?;
530 let host = url
531 .host_str()
532 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
533
534 let (_epoch, committee) = system
535 .current_committee()
536 .await?
537 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
538 let allowed_hosts = &committee.policy().http_request_allow_list;
539
540 ensure!(
541 allowed_hosts.contains(host),
542 ExecutionError::UnauthorizedHttpRequest(url)
543 );
544
545 let request = Client::new()
546 .request(request.method.into(), url)
547 .body(request.body)
548 .headers(headers);
549 #[cfg(not(web))]
550 let request = request.timeout(linera_base::time::Duration::from_millis(
551 committee.policy().http_request_timeout_ms,
552 ));
553
554 let response = request.send().await?;
555
556 let mut response_size_limit =
557 committee.policy().maximum_http_response_bytes;
558
559 if http_responses_are_oracle_responses {
560 response_size_limit = response_size_limit
561 .min(committee.policy().maximum_oracle_response_bytes);
562 }
563 Ok(OracleResponse::Http(
564 Self::receive_http_response(response, response_size_limit).await?,
565 ))
566 })
567 .await?
568 .to_http_response()?;
569 callback.respond(response);
570 }
571
572 ReadBlobContent { blob_id, callback } => {
573 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
574 content.clone()
575 } else {
576 let content = self.state.system.read_blob_content(blob_id).await?;
577 if blob_id.blob_type == BlobType::Data {
578 self.resource_controller
579 .with_state(&mut self.state.system)
580 .await?
581 .track_blob_read(content.bytes().len() as u64)?;
582 }
583 self.state
584 .system
585 .blob_used(self.txn_tracker, blob_id)
586 .await?;
587 content
588 };
589 callback.respond(content)
590 }
591
592 AssertBlobExists { blob_id, callback } => {
593 self.state.system.assert_blob_exists(blob_id).await?;
594 if blob_id.blob_type == BlobType::Data {
596 self.resource_controller
597 .with_state(&mut self.state.system)
598 .await?
599 .track_blob_read(0)?;
600 }
601 let is_new = self
602 .state
603 .system
604 .blob_used(self.txn_tracker, blob_id)
605 .await?;
606 if is_new {
607 self.txn_tracker
608 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
609 }
610 callback.respond(());
611 }
612
613 Emit {
614 stream_id,
615 value,
616 callback,
617 } => {
618 let count = self
619 .state
620 .system
621 .stream_event_counts
622 .get_mut_or_default(&stream_id)
623 .await?;
624 let index = *count;
625 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
626 self.resource_controller
627 .with_state(&mut self.state.system)
628 .await?
629 .track_event_published(&value)?;
630 self.txn_tracker.add_event(stream_id, index, value);
631 callback.respond(index)
632 }
633
634 ReadEvent { event_id, callback } => {
635 let context = self.state.context();
636 let extra = context.extra();
637 let event = self
638 .txn_tracker
639 .oracle(|| async {
640 let event = extra
641 .get_event(event_id.clone())
642 .await?
643 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
644 Ok(OracleResponse::Event(
645 event_id.clone(),
646 Arc::unwrap_or_clone(event),
647 ))
648 })
649 .await?
650 .to_event(&event_id)?;
651 self.resource_controller
652 .with_state(&mut self.state.system)
653 .await?
654 .track_event_read(event.len() as u64)?;
655 callback.respond(event);
656 }
657
658 SubscribeToEvents {
659 chain_id,
660 stream_id,
661 subscriber_app_id,
662 callback,
663 } => {
664 let subscriptions = self
665 .state
666 .system
667 .event_subscriptions
668 .get_mut_or_default(&(chain_id, stream_id.clone()))
669 .await?;
670 let next_index = match subscriptions.applications.entry(subscriber_app_id) {
671 std::collections::btree_map::Entry::Vacant(entry) => {
672 entry.insert(0);
673 subscriptions.min_next_index = 0;
674 0
675 }
676 std::collections::btree_map::Entry::Occupied(entry) => *entry.get(),
677 };
678 self.txn_tracker.add_stream_to_process(
679 subscriber_app_id,
680 chain_id,
681 stream_id,
682 0,
683 next_index,
684 );
685 callback.respond(());
686 }
687
688 UnsubscribeFromEvents {
689 chain_id,
690 stream_id,
691 subscriber_app_id,
692 callback,
693 } => {
694 let key = (chain_id, stream_id.clone());
695 let subscriptions = self
696 .state
697 .system
698 .event_subscriptions
699 .get_mut_or_default(&key)
700 .await?;
701 subscriptions.applications.remove(&subscriber_app_id);
702 if subscriptions.applications.is_empty() {
703 self.state.system.event_subscriptions.remove(&key)?;
704 } else {
705 subscriptions.recalculate_min();
706 }
707 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
708 self.txn_tracker
709 .remove_stream_to_process(app_id, chain_id, stream_id);
710 }
711 callback.respond(());
712 }
713
714 GetApplicationPermissions { callback } => {
715 let app_permissions = self.state.system.application_permissions.get().await?;
716 callback.respond(app_permissions.clone());
717 }
718
719 QueryServiceOracle {
720 deadline,
721 application_id,
722 next_block_height,
723 query,
724 callback,
725 } => {
726 let state = &mut self.state;
727 let local_time = self.txn_tracker.local_time();
728 let created_blobs = self.txn_tracker.created_blobs().clone();
729 let bytes = self
730 .txn_tracker
731 .oracle(|| async {
732 let context = QueryContext {
733 chain_id: state.context().extra().chain_id(),
734 next_block_height,
735 local_time,
736 };
737 let QueryOutcome {
738 response,
739 operations,
740 } = Box::pin(state.query_user_application_with_deadline(
741 application_id,
742 context,
743 query,
744 deadline,
745 created_blobs,
746 ))
747 .await?;
748 ensure!(
749 operations.is_empty(),
750 ExecutionError::ServiceOracleQueryOperations(operations)
751 );
752 Ok(OracleResponse::Service(response))
753 })
754 .await?
755 .to_service_response()?;
756 callback.respond(bytes);
757 }
758
759 AddOutgoingMessage { message, callback } => {
760 self.txn_tracker.add_outgoing_message(message);
761 callback.respond(());
762 }
763
764 SetLocalTime {
765 local_time,
766 callback,
767 } => {
768 self.txn_tracker.set_local_time(local_time);
769 callback.respond(());
770 }
771
772 AssertBefore {
773 timestamp,
774 callback,
775 } => {
776 let result = if !self
777 .txn_tracker
778 .replay_oracle_response(OracleResponse::Assert)?
779 {
780 let local_time = self.txn_tracker.local_time();
782 if local_time >= timestamp {
783 Err(ExecutionError::AssertBefore {
784 timestamp,
785 local_time,
786 })
787 } else {
788 Ok(())
789 }
790 } else {
791 Ok(())
792 };
793 callback.respond(result);
794 }
795
796 AddCreatedBlob { blob, callback } => {
797 if self.resource_controller.is_free {
798 self.txn_tracker.mark_blob_free(blob.id());
799 }
800 self.txn_tracker.add_created_blob(blob);
801 callback.respond(());
802 }
803
804 ValidationRound { round, callback } => {
805 let validation_round = self
806 .txn_tracker
807 .oracle(|| async { Ok(OracleResponse::Round(round)) })
808 .await?
809 .to_round()?;
810 callback.respond(validation_round);
811 }
812
813 TotalStorageSize {
814 application,
815 callback,
816 } => {
817 let view = self.state.users.try_load_entry(&application).await?;
818 let result = match view {
819 Some(view) => {
820 let total_size = view.total_size();
821 (total_size.key, total_size.value)
822 }
823 None => (0, 0),
824 };
825 callback.respond(result);
826 }
827
828 AllowApplicationLogs { callback } => {
829 let allow = self
830 .state
831 .context()
832 .extra()
833 .execution_runtime_config()
834 .allow_application_logs;
835 callback.respond(allow);
836 }
837
838 #[cfg(web)]
839 Log { message, level } => match level {
840 tracing::log::Level::Trace | tracing::log::Level::Debug => {
841 tracing::debug!(target: "user_application_log", message = %message);
842 }
843 tracing::log::Level::Info => {
844 tracing::info!(target: "user_application_log", message = %message);
845 }
846 tracing::log::Level::Warn => {
847 tracing::warn!(target: "user_application_log", message = %message);
848 }
849 tracing::log::Level::Error => {
850 tracing::error!(target: "user_application_log", message = %message);
851 }
852 },
853 }
854
855 Ok(())
856 }
857
858 #[instrument(skip_all)]
861 async fn process_subscriptions(
862 &mut self,
863 context: ProcessStreamsContext,
864 ) -> Result<(), ExecutionError> {
865 let mut processed = BTreeSet::new();
868 loop {
869 let to_process = self
870 .txn_tracker
871 .take_streams_to_process()
872 .into_iter()
873 .filter_map(|(app_id, updates)| {
874 let updates = updates
875 .into_iter()
876 .filter_map(|update| {
877 if !processed.insert((
878 app_id,
879 update.chain_id,
880 update.stream_id.clone(),
881 )) {
882 return None;
883 }
884 Some(update)
885 })
886 .collect::<Vec<_>>();
887 if updates.is_empty() {
888 return None;
889 }
890 Some((app_id, updates))
891 })
892 .collect::<BTreeMap<_, _>>();
893 if to_process.is_empty() {
894 return Ok(());
895 }
896 for (app_id, updates) in to_process {
897 self.run_user_action(
898 app_id,
899 UserAction::ProcessStreams(context, updates),
900 None,
901 None,
902 )
903 .await?;
904 }
905 }
906 }
907
908 pub(crate) async fn run_user_action(
909 &mut self,
910 application_id: ApplicationId,
911 action: UserAction,
912 refund_grant_to: Option<Account>,
913 grant: Option<&mut Amount>,
914 ) -> Result<(), ExecutionError> {
915 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
916 .await
917 }
918
919 pub(crate) async fn service_and_dependencies(
921 &mut self,
922 application: ApplicationId,
923 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
924 let mut stack = vec![application];
927 let mut codes = vec![];
928 let mut descriptions = vec![];
929
930 while let Some(id) = stack.pop() {
931 let (code, description) = self.load_service(id).await?;
932 stack.extend(description.required_application_ids.iter().rev().copied());
933 codes.push(code);
934 descriptions.push(description);
935 }
936
937 codes.reverse();
938 descriptions.reverse();
939
940 Ok((codes, descriptions))
941 }
942
943 #[instrument(skip_all, fields(application_id = %application))]
945 async fn contract_and_dependencies(
946 &mut self,
947 application: ApplicationId,
948 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
949 let mut stack = vec![application];
952 let mut codes = vec![];
953 let mut descriptions = vec![];
954
955 while let Some(id) = stack.pop() {
956 let (code, description) = self.load_contract(id).await?;
957 stack.extend(description.required_application_ids.iter().rev().copied());
958 codes.push(code);
959 descriptions.push(description);
960 }
961
962 codes.reverse();
963 descriptions.reverse();
964
965 Ok((codes, descriptions))
966 }
967
968 #[instrument(skip_all, fields(application_id = %application_id))]
969 async fn run_user_action_with_runtime(
970 &mut self,
971 application_id: ApplicationId,
972 action: UserAction,
973 refund_grant_to: Option<Account>,
974 grant: Option<&mut Amount>,
975 ) -> Result<(), ExecutionError> {
976 let chain_id = self.state.context().extra().chain_id();
977 let mut cloned_grant = grant.as_ref().map(|x| **x);
978 let initial_balance = self
979 .resource_controller
980 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
981 .await?
982 .balance()?;
983 let mut controller = ResourceController::new(
984 self.resource_controller.policy().clone(),
985 self.resource_controller.tracker,
986 initial_balance,
987 );
988 let is_free = matches!(
989 &action,
990 UserAction::Message(..) | UserAction::ProcessStreams(..)
991 ) && self
992 .resource_controller
993 .policy()
994 .is_free_app(&application_id);
995 controller.is_free = is_free;
996 self.resource_controller.is_free = is_free;
997 let (execution_state_sender, mut execution_state_receiver) =
998 futures::channel::mpsc::unbounded();
999
1000 let (codes, descriptions): (Vec<_>, Vec<_>) =
1001 self.contract_and_dependencies(application_id).await?;
1002
1003 let allow_application_logs = self
1004 .state
1005 .context()
1006 .extra()
1007 .execution_runtime_config()
1008 .allow_application_logs;
1009
1010 let contract_runtime_task = self
1011 .state
1012 .context()
1013 .extra()
1014 .thread_pool()
1015 .run_send(JsVec(codes), move |codes| async move {
1016 let runtime = ContractSyncRuntime::new(
1017 execution_state_sender,
1018 chain_id,
1019 refund_grant_to,
1020 controller,
1021 &action,
1022 allow_application_logs,
1023 );
1024
1025 for (code, description) in codes.0.into_iter().zip(descriptions) {
1026 runtime.preload_contract(ApplicationId::from(&description), code, description);
1027 }
1028
1029 runtime.run_action(application_id, chain_id, action)
1030 })
1031 .await;
1032
1033 async {
1034 while let Some(request) = execution_state_receiver.next().await {
1035 self.handle_request(request).await?;
1036 }
1037 Ok::<(), ExecutionError>(())
1038 }
1039 .instrument(info_span!("handle_runtime_requests"))
1040 .await?;
1041
1042 let (result, controller) = contract_runtime_task.await??;
1043
1044 self.resource_controller.is_free = false;
1045
1046 self.txn_tracker.add_operation_result(result);
1047
1048 self.resource_controller
1049 .with_state_and_grant(&mut self.state.system, grant)
1050 .await?
1051 .merge_balance(initial_balance, controller.balance()?)?;
1052 self.resource_controller.tracker = controller.tracker;
1053
1054 Ok(())
1055 }
1056
1057 #[instrument(skip_all, fields(
1058 chain_id = %context.chain_id,
1059 block_height = %context.height,
1060 operation_type = %operation.as_ref(),
1061 ))]
1062 pub async fn execute_operation(
1063 &mut self,
1064 context: OperationContext,
1065 operation: Operation,
1066 ) -> Result<(), ExecutionError> {
1067 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1068 match operation {
1069 Operation::System(op) => {
1070 let new_application = self
1071 .state
1072 .system
1073 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
1074 .await?;
1075 if let Some((application_id, argument)) = new_application {
1076 let user_action = UserAction::Instantiate(context, argument);
1077 self.run_user_action(
1078 application_id,
1079 user_action,
1080 context.refund_grant_to(),
1081 None,
1082 )
1083 .await?;
1084 }
1085 }
1086 Operation::User {
1087 application_id,
1088 bytes,
1089 } => {
1090 self.run_user_action(
1091 application_id,
1092 UserAction::Operation(context, bytes),
1093 context.refund_grant_to(),
1094 None,
1095 )
1096 .await?;
1097 }
1098 }
1099 self.process_subscriptions(context.into()).await?;
1100 Ok(())
1101 }
1102
1103 #[instrument(skip_all, fields(
1104 chain_id = %context.chain_id,
1105 block_height = %context.height,
1106 origin = %context.origin,
1107 is_bouncing = %context.is_bouncing,
1108 message_type = %message.as_ref(),
1109 ))]
1110 pub async fn execute_message(
1111 &mut self,
1112 context: MessageContext,
1113 message: Message,
1114 grant: Option<&mut Amount>,
1115 ) -> Result<(), ExecutionError> {
1116 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1117 match message {
1118 Message::System(message) => {
1119 let outcome = self.state.system.execute_message(context, message).await?;
1120 self.txn_tracker.add_outgoing_messages(outcome);
1121 }
1122 Message::User {
1123 application_id,
1124 bytes,
1125 } => {
1126 self.run_user_action(
1127 application_id,
1128 UserAction::Message(context, bytes),
1129 context.refund_grant_to,
1130 grant,
1131 )
1132 .await?;
1133 }
1134 }
1135 self.process_subscriptions(context.into()).await?;
1136 Ok(())
1137 }
1138
1139 pub fn bounce_message(
1140 &mut self,
1141 context: MessageContext,
1142 grant: Amount,
1143 message: Message,
1144 ) -> Result<(), ExecutionError> {
1145 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1146 self.txn_tracker.add_outgoing_message(OutgoingMessage {
1147 destination: context.origin,
1148 authenticated_owner: context.authenticated_owner,
1149 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1150 grant,
1151 kind: MessageKind::Bouncing,
1152 message,
1153 });
1154 Ok(())
1155 }
1156
1157 pub fn send_refund(
1158 &mut self,
1159 context: MessageContext,
1160 amount: Amount,
1161 ) -> Result<(), ExecutionError> {
1162 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1163 if amount.is_zero() {
1164 return Ok(());
1165 }
1166 let Some(account) = context.refund_grant_to else {
1167 return Err(ExecutionError::InternalError(
1168 "Messages with grants should have a non-empty `refund_grant_to`",
1169 ));
1170 };
1171 let message = SystemMessage::Credit {
1172 amount,
1173 source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
1174 target: account.owner,
1175 };
1176 self.txn_tracker.add_outgoing_message(
1177 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1178 );
1179 Ok(())
1180 }
1181
1182 async fn receive_http_response(
1186 response: reqwest::Response,
1187 size_limit: u64,
1188 ) -> Result<http::Response, ExecutionError> {
1189 let status = response.status().as_u16();
1190 let maybe_content_length = response.content_length();
1191
1192 let headers = response
1193 .headers()
1194 .iter()
1195 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1196 .collect::<Vec<_>>();
1197
1198 let total_header_size = headers
1199 .iter()
1200 .map(|header| (header.name.len() + header.value.len()) as u64)
1201 .sum();
1202
1203 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1204 ExecutionError::HttpResponseSizeLimitExceeded {
1205 limit: size_limit,
1206 size: total_header_size,
1207 },
1208 )?;
1209
1210 if let Some(content_length) = maybe_content_length {
1211 if content_length > remaining_bytes {
1212 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1213 limit: size_limit,
1214 size: content_length + total_header_size,
1215 });
1216 }
1217 }
1218
1219 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1220 let mut body_stream = response.bytes_stream();
1221
1222 while let Some(bytes) = body_stream.next().await.transpose()? {
1223 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1224 ExecutionError::HttpResponseSizeLimitExceeded {
1225 limit: size_limit,
1226 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1227 },
1228 )?;
1229
1230 body.extend(&bytes);
1231 }
1232
1233 Ok(http::Response {
1234 status,
1235 headers,
1236 body,
1237 })
1238 }
1239}
1240
1241#[derive(Debug, strum::AsRefStr)]
1243pub enum ExecutionRequest {
1244 #[cfg(not(web))]
1245 LoadContract {
1246 id: ApplicationId,
1247 #[debug(skip)]
1248 callback: Sender<(UserContractCode, ApplicationDescription)>,
1249 },
1250
1251 #[cfg(not(web))]
1252 LoadService {
1253 id: ApplicationId,
1254 #[debug(skip)]
1255 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1256 },
1257
1258 ChainBalance {
1259 #[debug(skip)]
1260 callback: Sender<Amount>,
1261 },
1262
1263 OwnerBalance {
1264 owner: AccountOwner,
1265 #[debug(skip)]
1266 callback: Sender<Amount>,
1267 },
1268
1269 OwnerBalances {
1270 #[debug(skip)]
1271 callback: Sender<Vec<(AccountOwner, Amount)>>,
1272 },
1273
1274 BalanceOwners {
1275 #[debug(skip)]
1276 callback: Sender<Vec<AccountOwner>>,
1277 },
1278
1279 Allowance {
1280 owner: AccountOwner,
1281 spender: AccountOwner,
1282 #[debug(skip)]
1283 callback: Sender<Amount>,
1284 },
1285
1286 Allowances {
1287 #[debug(skip)]
1288 callback: Sender<Vec<(AccountOwner, AccountOwner, Amount)>>,
1289 },
1290
1291 Transfer {
1292 source: AccountOwner,
1293 destination: Account,
1294 amount: Amount,
1295 #[debug(skip_if = Option::is_none)]
1296 signer: Option<AccountOwner>,
1297 application_id: ApplicationId,
1298 #[debug(skip)]
1299 callback: Sender<()>,
1300 },
1301
1302 Claim {
1303 source: Account,
1304 destination: Account,
1305 amount: Amount,
1306 #[debug(skip_if = Option::is_none)]
1307 signer: Option<AccountOwner>,
1308 application_id: ApplicationId,
1309 #[debug(skip)]
1310 callback: Sender<()>,
1311 },
1312
1313 Approve {
1314 owner: AccountOwner,
1315 spender: AccountOwner,
1316 amount: Amount,
1317 #[debug(skip_if = Option::is_none)]
1318 signer: Option<AccountOwner>,
1319 application_id: ApplicationId,
1320 #[debug(skip)]
1321 callback: Sender<()>,
1322 },
1323
1324 TransferFrom {
1325 owner: AccountOwner,
1326 spender: AccountOwner,
1327 destination: Account,
1328 amount: Amount,
1329 #[debug(skip_if = Option::is_none)]
1330 signer: Option<AccountOwner>,
1331 application_id: ApplicationId,
1332 #[debug(skip)]
1333 callback: Sender<()>,
1334 },
1335
1336 SystemTimestamp {
1337 #[debug(skip)]
1338 callback: Sender<Timestamp>,
1339 },
1340
1341 ChainOwnership {
1342 #[debug(skip)]
1343 callback: Sender<ChainOwnership>,
1344 },
1345
1346 ApplicationPermissions {
1347 #[debug(skip)]
1348 callback: Sender<ApplicationPermissions>,
1349 },
1350
1351 ReadApplicationDescription {
1352 application_id: ApplicationId,
1353 #[debug(skip)]
1354 callback: Sender<ApplicationDescription>,
1355 },
1356
1357 ReadValueBytes {
1358 id: ApplicationId,
1359 #[debug(with = hex_debug)]
1360 key: Vec<u8>,
1361 #[debug(skip)]
1362 callback: Sender<Option<Vec<u8>>>,
1363 },
1364
1365 ContainsKey {
1366 id: ApplicationId,
1367 key: Vec<u8>,
1368 #[debug(skip)]
1369 callback: Sender<bool>,
1370 },
1371
1372 ContainsKeys {
1373 id: ApplicationId,
1374 #[debug(with = hex_vec_debug)]
1375 keys: Vec<Vec<u8>>,
1376 callback: Sender<Vec<bool>>,
1377 },
1378
1379 ReadMultiValuesBytes {
1380 id: ApplicationId,
1381 #[debug(with = hex_vec_debug)]
1382 keys: Vec<Vec<u8>>,
1383 #[debug(skip)]
1384 callback: Sender<Vec<Option<Vec<u8>>>>,
1385 },
1386
1387 FindKeysByPrefix {
1388 id: ApplicationId,
1389 #[debug(with = hex_debug)]
1390 key_prefix: Vec<u8>,
1391 #[debug(skip)]
1392 callback: Sender<Vec<Vec<u8>>>,
1393 },
1394
1395 FindKeyValuesByPrefix {
1396 id: ApplicationId,
1397 #[debug(with = hex_debug)]
1398 key_prefix: Vec<u8>,
1399 #[debug(skip)]
1400 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1401 },
1402
1403 WriteBatch {
1404 id: ApplicationId,
1405 batch: Batch,
1406 #[debug(skip)]
1407 callback: Sender<()>,
1408 },
1409
1410 OpenChain {
1411 ownership: ChainOwnership,
1412 #[debug(skip_if = Amount::is_zero)]
1413 balance: Amount,
1414 parent_id: ChainId,
1415 block_height: BlockHeight,
1416 application_permissions: ApplicationPermissions,
1417 timestamp: Timestamp,
1418 #[debug(skip)]
1419 callback: Sender<ChainId>,
1420 },
1421
1422 CloseChain {
1423 application_id: ApplicationId,
1424 #[debug(skip)]
1425 callback: Sender<Result<(), ExecutionError>>,
1426 },
1427
1428 ChangeOwnership {
1429 application_id: ApplicationId,
1430 ownership: ChainOwnership,
1431 #[debug(skip)]
1432 callback: Sender<Result<(), ExecutionError>>,
1433 },
1434
1435 ChangeApplicationPermissions {
1436 application_id: ApplicationId,
1437 application_permissions: ApplicationPermissions,
1438 #[debug(skip)]
1439 callback: Sender<Result<(), ExecutionError>>,
1440 },
1441
1442 PeekApplicationIndex {
1443 #[debug(skip)]
1444 callback: Sender<u32>,
1445 },
1446
1447 CreateApplication {
1448 chain_id: ChainId,
1449 block_height: BlockHeight,
1450 module_id: ModuleId,
1451 parameters: Vec<u8>,
1452 required_application_ids: Vec<ApplicationId>,
1453 #[debug(skip)]
1454 callback: Sender<CreateApplicationResult>,
1455 },
1456
1457 PerformHttpRequest {
1458 request: http::Request,
1459 http_responses_are_oracle_responses: bool,
1460 #[debug(skip)]
1461 callback: Sender<http::Response>,
1462 },
1463
1464 ReadBlobContent {
1465 blob_id: BlobId,
1466 #[debug(skip)]
1467 callback: Sender<BlobContent>,
1468 },
1469
1470 AssertBlobExists {
1471 blob_id: BlobId,
1472 #[debug(skip)]
1473 callback: Sender<()>,
1474 },
1475
1476 Emit {
1477 stream_id: StreamId,
1478 #[debug(with = hex_debug)]
1479 value: Vec<u8>,
1480 #[debug(skip)]
1481 callback: Sender<u32>,
1482 },
1483
1484 ReadEvent {
1485 event_id: EventId,
1486 callback: oneshot::Sender<Vec<u8>>,
1487 },
1488
1489 SubscribeToEvents {
1490 chain_id: ChainId,
1491 stream_id: StreamId,
1492 subscriber_app_id: ApplicationId,
1493 #[debug(skip)]
1494 callback: Sender<()>,
1495 },
1496
1497 UnsubscribeFromEvents {
1498 chain_id: ChainId,
1499 stream_id: StreamId,
1500 subscriber_app_id: ApplicationId,
1501 #[debug(skip)]
1502 callback: Sender<()>,
1503 },
1504
1505 GetApplicationPermissions {
1506 #[debug(skip)]
1507 callback: Sender<ApplicationPermissions>,
1508 },
1509
1510 QueryServiceOracle {
1511 deadline: Option<Instant>,
1512 application_id: ApplicationId,
1513 next_block_height: BlockHeight,
1514 query: Vec<u8>,
1515 #[debug(skip)]
1516 callback: Sender<Vec<u8>>,
1517 },
1518
1519 AddOutgoingMessage {
1520 message: crate::OutgoingMessage,
1521 #[debug(skip)]
1522 callback: Sender<()>,
1523 },
1524
1525 SetLocalTime {
1526 local_time: Timestamp,
1527 #[debug(skip)]
1528 callback: Sender<()>,
1529 },
1530
1531 AssertBefore {
1532 timestamp: Timestamp,
1533 #[debug(skip)]
1534 callback: Sender<Result<(), ExecutionError>>,
1535 },
1536
1537 AddCreatedBlob {
1538 blob: crate::Blob,
1539 #[debug(skip)]
1540 callback: Sender<()>,
1541 },
1542
1543 ValidationRound {
1544 round: Option<u32>,
1545 #[debug(skip)]
1546 callback: Sender<Option<u32>>,
1547 },
1548
1549 TotalStorageSize {
1550 application: ApplicationId,
1551 #[debug(skip)]
1552 callback: Sender<(u32, u32)>,
1553 },
1554
1555 AllowApplicationLogs {
1556 #[debug(skip)]
1557 callback: Sender<bool>,
1558 },
1559
1560 #[cfg(web)]
1562 Log {
1563 message: String,
1564 level: tracing::log::Level,
1565 },
1566}