1use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15 Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{
19 Account, AccountOwner, BlobId, BlobType, ChainId, EventId, OwnerSpender, StreamId,
20 },
21 ownership::ChainOwnership,
22 time::Instant,
23};
24use linera_views::{batch::Batch, context::Context, views::View};
25use oneshot::Sender;
26use reqwest::{header::HeaderMap, Client, Url};
27use tracing::{info_span, instrument, Instrument as _};
28
29use crate::{
30 execution::UserAction,
31 runtime::ContractSyncRuntime,
32 system::{CreateApplicationResult, OpenChainConfig},
33 util::{OracleResponseExt as _, RespondExt as _},
34 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
35 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
36 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
37 ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
38};
39
40pub struct ExecutionStateActor<'a, C> {
42 state: &'a mut ExecutionStateView<C>,
43 txn_tracker: &'a mut TransactionTracker,
44 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
45}
46
47#[cfg(with_metrics)]
48mod metrics {
49 use std::sync::LazyLock;
50
51 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
52 use prometheus::HistogramVec;
53
54 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
56 register_histogram_vec(
57 "load_contract_latency",
58 "Load contract latency",
59 &[],
60 exponential_bucket_latencies(250.0),
61 )
62 });
63
64 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
66 register_histogram_vec(
67 "load_service_latency",
68 "Load service latency",
69 &[],
70 exponential_bucket_latencies(250.0),
71 )
72 });
73}
74
75pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
76
77impl<'a, C> ExecutionStateActor<'a, C>
78where
79 C: Context + Clone + 'static,
80 C::Extra: ExecutionRuntimeContext,
81{
82 pub fn new(
84 state: &'a mut ExecutionStateView<C>,
85 txn_tracker: &'a mut TransactionTracker,
86 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
87 ) -> Self {
88 Self {
89 state,
90 txn_tracker,
91 resource_controller,
92 }
93 }
94
95 #[instrument(skip_all, fields(application_id = %id))]
96 pub(crate) async fn load_contract(
97 &mut self,
98 id: ApplicationId,
99 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
100 #[cfg(with_metrics)]
101 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
102 let blob_id = id.description_blob_id();
103 let description = match self.txn_tracker.get_blob_content(&blob_id) {
104 Some(blob) => bcs::from_bytes(blob.bytes())?,
105 None => {
106 self.state
107 .system
108 .describe_application(id, self.txn_tracker)
109 .await?
110 }
111 };
112 let code = self
113 .state
114 .context()
115 .extra()
116 .get_user_contract(&description, self.txn_tracker)
117 .await?;
118 Ok((code, description))
119 }
120
121 pub(crate) async fn load_service(
122 &mut self,
123 id: ApplicationId,
124 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
125 #[cfg(with_metrics)]
126 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
127 let blob_id = id.description_blob_id();
128 let description = match self.txn_tracker.get_blob_content(&blob_id) {
129 Some(blob) => bcs::from_bytes(blob.bytes())?,
130 None => {
131 self.state
132 .system
133 .describe_application(id, self.txn_tracker)
134 .await?
135 }
136 };
137 let code = self
138 .state
139 .context()
140 .extra()
141 .get_user_service(&description, self.txn_tracker)
142 .await?;
143 Ok((code, description))
144 }
145
146 #[instrument(
148 skip_all,
149 fields(request_type = %request.as_ref())
150 )]
151 pub(crate) async fn handle_request(
152 &mut self,
153 request: ExecutionRequest,
154 ) -> Result<(), ExecutionError> {
155 use ExecutionRequest::*;
156 match request {
157 #[cfg(not(web))]
158 LoadContract { id, callback } => {
159 let (code, description) = self.load_contract(id).await?;
160 callback.respond((code, description))
161 }
162 #[cfg(not(web))]
163 LoadService { id, callback } => {
164 let (code, description) = self.load_service(id).await?;
165 callback.respond((code, description))
166 }
167
168 ChainBalance { callback } => {
169 let balance = *self.state.system.balance.get();
170 callback.respond(balance);
171 }
172
173 OwnerBalance { owner, callback } => {
174 let balance = self
175 .state
176 .system
177 .balances
178 .get(&owner)
179 .await?
180 .unwrap_or_default();
181 callback.respond(balance);
182 }
183
184 OwnerBalances { callback } => {
185 callback.respond(self.state.system.balances.index_values().await?);
186 }
187
188 BalanceOwners { callback } => {
189 let owners = self.state.system.balances.indices().await?;
190 callback.respond(owners);
191 }
192
193 Allowance {
194 owner,
195 spender,
196 callback,
197 } => {
198 let owner_spender = OwnerSpender::new(owner, spender);
199 let allowance = self
200 .state
201 .system
202 .allowances
203 .get(&owner_spender)
204 .await?
205 .unwrap_or_default();
206 callback.respond(allowance);
207 }
208
209 Allowances { callback } => {
210 let entries: Vec<_> = self
211 .state
212 .system
213 .allowances
214 .index_values()
215 .await?
216 .into_iter()
217 .map(|(os, amount)| (os.owner, os.spender, amount))
218 .collect();
219 callback.respond(entries);
220 }
221
222 Transfer {
223 source,
224 destination,
225 amount,
226 signer,
227 application_id,
228 callback,
229 } => {
230 let maybe_message = self
231 .state
232 .system
233 .transfer(signer, Some(application_id), source, destination, amount)
234 .await?;
235 self.txn_tracker.add_outgoing_messages(maybe_message);
236 callback.respond(());
237 }
238
239 Claim {
240 source,
241 destination,
242 amount,
243 signer,
244 application_id,
245 callback,
246 } => {
247 let maybe_message = self
248 .state
249 .system
250 .claim(
251 signer,
252 Some(application_id),
253 source.owner,
254 source.chain_id,
255 destination,
256 amount,
257 )
258 .await?;
259 self.txn_tracker.add_outgoing_messages(maybe_message);
260 callback.respond(());
261 }
262
263 Approve {
264 owner,
265 spender,
266 amount,
267 signer,
268 application_id,
269 callback,
270 } => {
271 self.state
272 .system
273 .approve(signer, Some(application_id), owner, spender, amount)
274 .await?;
275 callback.respond(());
276 }
277
278 TransferFrom {
279 owner,
280 spender,
281 destination,
282 amount,
283 signer,
284 application_id,
285 callback,
286 } => {
287 let maybe_message = self
288 .state
289 .system
290 .transfer_from(
291 signer,
292 Some(application_id),
293 owner,
294 spender,
295 destination,
296 amount,
297 )
298 .await?;
299 self.txn_tracker.add_outgoing_messages(maybe_message);
300 callback.respond(());
301 }
302
303 SystemTimestamp { callback } => {
304 let timestamp = *self.state.system.timestamp.get();
305 callback.respond(timestamp);
306 }
307
308 ChainOwnership { callback } => {
309 let ownership = self.state.system.ownership.get().clone();
310 callback.respond(ownership);
311 }
312
313 ApplicationPermissions { callback } => {
314 let permissions = self.state.system.application_permissions.get().clone();
315 callback.respond(permissions);
316 }
317
318 ReadApplicationDescription {
319 application_id,
320 callback,
321 } => {
322 let blob_id = application_id.description_blob_id();
323 let description = match self.txn_tracker.get_blob_content(&blob_id) {
324 Some(blob) => bcs::from_bytes(blob.bytes())?,
325 None => {
326 let blob_content = self.state.system.read_blob_content(blob_id).await?;
327 self.state
328 .system
329 .blob_used(self.txn_tracker, blob_id)
330 .await?;
331 bcs::from_bytes(blob_content.bytes())?
332 }
333 };
334 callback.respond(description);
335 }
336
337 ContainsKey { id, key, callback } => {
338 let view = self.state.users.try_load_entry(&id).await?;
339 let result = match view {
340 Some(view) => view.contains_key(&key).await?,
341 None => false,
342 };
343 callback.respond(result);
344 }
345
346 ContainsKeys { id, keys, callback } => {
347 let view = self.state.users.try_load_entry(&id).await?;
348 let result = match view {
349 Some(view) => view.contains_keys(&keys).await?,
350 None => vec![false; keys.len()],
351 };
352 callback.respond(result);
353 }
354
355 ReadMultiValuesBytes { id, keys, callback } => {
356 let view = self.state.users.try_load_entry(&id).await?;
357 let values = match view {
358 Some(view) => view.multi_get(&keys).await?,
359 None => vec![None; keys.len()],
360 };
361 callback.respond(values);
362 }
363
364 ReadValueBytes { id, key, callback } => {
365 let view = self.state.users.try_load_entry(&id).await?;
366 let result = match view {
367 Some(view) => view.get(&key).await?,
368 None => None,
369 };
370 callback.respond(result);
371 }
372
373 FindKeysByPrefix {
374 id,
375 key_prefix,
376 callback,
377 } => {
378 let view = self.state.users.try_load_entry(&id).await?;
379 let result = match view {
380 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
381 None => Vec::new(),
382 };
383 callback.respond(result);
384 }
385
386 FindKeyValuesByPrefix {
387 id,
388 key_prefix,
389 callback,
390 } => {
391 let view = self.state.users.try_load_entry(&id).await?;
392 let result = match view {
393 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
394 None => Vec::new(),
395 };
396 callback.respond(result);
397 }
398
399 WriteBatch {
400 id,
401 batch,
402 callback,
403 } => {
404 let mut view = self.state.users.try_load_entry_mut(&id).await?;
405 view.write_batch(batch).await?;
406 callback.respond(());
407 }
408
409 OpenChain {
410 ownership,
411 balance,
412 parent_id,
413 block_height,
414 application_permissions,
415 timestamp,
416 callback,
417 } => {
418 let config = OpenChainConfig {
419 ownership,
420 balance,
421 application_permissions,
422 };
423 let chain_id = self
424 .state
425 .system
426 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
427 .await?;
428 callback.respond(chain_id);
429 }
430
431 CloseChain {
432 application_id,
433 callback,
434 } => {
435 let app_permissions = self.state.system.application_permissions.get();
436 if !app_permissions.can_manage_chain(&application_id) {
437 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
438 } else {
439 self.state.system.close_chain();
440 callback.respond(Ok(()));
441 }
442 }
443
444 ChangeOwnership {
445 application_id,
446 ownership,
447 callback,
448 } => {
449 let app_permissions = self.state.system.application_permissions.get();
450 if !app_permissions.can_manage_chain(&application_id) {
451 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
452 } else {
453 self.state.system.ownership.set(ownership);
454 callback.respond(Ok(()));
455 }
456 }
457
458 ChangeApplicationPermissions {
459 application_id,
460 application_permissions,
461 callback,
462 } => {
463 let app_permissions = self.state.system.application_permissions.get();
464 if !app_permissions.can_manage_chain(&application_id) {
465 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
466 } else {
467 self.state
468 .system
469 .application_permissions
470 .set(application_permissions);
471 callback.respond(Ok(()));
472 }
473 }
474
475 PeekApplicationIndex { callback } => {
476 let index = self.txn_tracker.peek_application_index();
477 callback.respond(index)
478 }
479
480 CreateApplication {
481 chain_id,
482 block_height,
483 module_id,
484 parameters,
485 required_application_ids,
486 callback,
487 } => {
488 let create_application_result = self
489 .state
490 .system
491 .create_application(
492 chain_id,
493 block_height,
494 module_id,
495 parameters,
496 required_application_ids,
497 self.txn_tracker,
498 )
499 .await?;
500 callback.respond(create_application_result);
501 }
502
503 PerformHttpRequest {
504 request,
505 http_responses_are_oracle_responses,
506 callback,
507 } => {
508 let system = &mut self.state.system;
509 let response = self
510 .txn_tracker
511 .oracle(|| async {
512 let headers = request
513 .headers
514 .into_iter()
515 .map(|http::Header { name, value }| {
516 Ok((name.parse()?, value.try_into()?))
517 })
518 .collect::<Result<HeaderMap, ExecutionError>>()?;
519
520 let url = Url::parse(&request.url)?;
521 let host = url
522 .host_str()
523 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
524
525 let (_epoch, committee) = system
526 .current_committee()
527 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
528 let allowed_hosts = &committee.policy().http_request_allow_list;
529
530 ensure!(
531 allowed_hosts.contains(host),
532 ExecutionError::UnauthorizedHttpRequest(url)
533 );
534
535 let request = Client::new()
536 .request(request.method.into(), url)
537 .body(request.body)
538 .headers(headers);
539 #[cfg(not(web))]
540 let request = request.timeout(linera_base::time::Duration::from_millis(
541 committee.policy().http_request_timeout_ms,
542 ));
543
544 let response = request.send().await?;
545
546 let mut response_size_limit =
547 committee.policy().maximum_http_response_bytes;
548
549 if http_responses_are_oracle_responses {
550 response_size_limit = response_size_limit
551 .min(committee.policy().maximum_oracle_response_bytes);
552 }
553 Ok(OracleResponse::Http(
554 Self::receive_http_response(response, response_size_limit).await?,
555 ))
556 })
557 .await?
558 .to_http_response()?;
559 callback.respond(response);
560 }
561
562 ReadBlobContent { blob_id, callback } => {
563 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
564 content.clone()
565 } else {
566 let content = self.state.system.read_blob_content(blob_id).await?;
567 if blob_id.blob_type == BlobType::Data {
568 self.resource_controller
569 .with_state(&mut self.state.system)
570 .await?
571 .track_blob_read(content.bytes().len() as u64)?;
572 }
573 self.state
574 .system
575 .blob_used(self.txn_tracker, blob_id)
576 .await?;
577 content
578 };
579 callback.respond(content)
580 }
581
582 AssertBlobExists { blob_id, callback } => {
583 self.state.system.assert_blob_exists(blob_id).await?;
584 if blob_id.blob_type == BlobType::Data {
586 self.resource_controller
587 .with_state(&mut self.state.system)
588 .await?
589 .track_blob_read(0)?;
590 }
591 let is_new = self
592 .state
593 .system
594 .blob_used(self.txn_tracker, blob_id)
595 .await?;
596 if is_new {
597 self.txn_tracker
598 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
599 }
600 callback.respond(());
601 }
602
603 Emit {
604 stream_id,
605 value,
606 callback,
607 } => {
608 let count = self
609 .state
610 .system
611 .stream_event_counts
612 .get_mut_or_default(&stream_id)
613 .await?;
614 let index = *count;
615 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
616 self.resource_controller
617 .with_state(&mut self.state.system)
618 .await?
619 .track_event_published(&value)?;
620 self.txn_tracker.add_event(stream_id, index, value);
621 callback.respond(index)
622 }
623
624 ReadEvent { event_id, callback } => {
625 let context = self.state.context();
626 let extra = context.extra();
627 let event = self
628 .txn_tracker
629 .oracle(|| async {
630 let event = extra
631 .get_event(event_id.clone())
632 .await?
633 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
634 Ok(OracleResponse::Event(event_id.clone(), event))
635 })
636 .await?
637 .to_event(&event_id)?;
638 self.resource_controller
639 .with_state(&mut self.state.system)
640 .await?
641 .track_event_read(event.len() as u64)?;
642 callback.respond(event);
643 }
644
645 SubscribeToEvents {
646 chain_id,
647 stream_id,
648 subscriber_app_id,
649 callback,
650 } => {
651 let subscriptions = self
652 .state
653 .system
654 .event_subscriptions
655 .get_mut_or_default(&(chain_id, stream_id.clone()))
656 .await?;
657 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
658 subscriptions.next_index
659 } else {
660 0
661 };
662 self.txn_tracker.add_stream_to_process(
663 subscriber_app_id,
664 chain_id,
665 stream_id,
666 0,
667 next_index,
668 );
669 callback.respond(());
670 }
671
672 UnsubscribeFromEvents {
673 chain_id,
674 stream_id,
675 subscriber_app_id,
676 callback,
677 } => {
678 let key = (chain_id, stream_id.clone());
679 let subscriptions = self
680 .state
681 .system
682 .event_subscriptions
683 .get_mut_or_default(&key)
684 .await?;
685 subscriptions.applications.remove(&subscriber_app_id);
686 if subscriptions.applications.is_empty() {
687 self.state.system.event_subscriptions.remove(&key)?;
688 }
689 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
690 self.txn_tracker
691 .remove_stream_to_process(app_id, chain_id, stream_id);
692 }
693 callback.respond(());
694 }
695
696 GetApplicationPermissions { callback } => {
697 let app_permissions = self.state.system.application_permissions.get();
698 callback.respond(app_permissions.clone());
699 }
700
701 QueryServiceOracle {
702 deadline,
703 application_id,
704 next_block_height,
705 query,
706 callback,
707 } => {
708 let state = &mut self.state;
709 let local_time = self.txn_tracker.local_time();
710 let created_blobs = self.txn_tracker.created_blobs().clone();
711 let bytes = self
712 .txn_tracker
713 .oracle(|| async {
714 let context = QueryContext {
715 chain_id: state.context().extra().chain_id(),
716 next_block_height,
717 local_time,
718 };
719 let QueryOutcome {
720 response,
721 operations,
722 } = Box::pin(state.query_user_application_with_deadline(
723 application_id,
724 context,
725 query,
726 deadline,
727 created_blobs,
728 ))
729 .await?;
730 ensure!(
731 operations.is_empty(),
732 ExecutionError::ServiceOracleQueryOperations(operations)
733 );
734 Ok(OracleResponse::Service(response))
735 })
736 .await?
737 .to_service_response()?;
738 callback.respond(bytes);
739 }
740
741 AddOutgoingMessage { message, callback } => {
742 self.txn_tracker.add_outgoing_message(message);
743 callback.respond(());
744 }
745
746 SetLocalTime {
747 local_time,
748 callback,
749 } => {
750 self.txn_tracker.set_local_time(local_time);
751 callback.respond(());
752 }
753
754 AssertBefore {
755 timestamp,
756 callback,
757 } => {
758 let result = if !self
759 .txn_tracker
760 .replay_oracle_response(OracleResponse::Assert)?
761 {
762 let local_time = self.txn_tracker.local_time();
764 if local_time >= timestamp {
765 Err(ExecutionError::AssertBefore {
766 timestamp,
767 local_time,
768 })
769 } else {
770 Ok(())
771 }
772 } else {
773 Ok(())
774 };
775 callback.respond(result);
776 }
777
778 AddCreatedBlob { blob, callback } => {
779 if self.resource_controller.is_free {
780 self.txn_tracker.mark_blob_free(blob.id());
781 }
782 self.txn_tracker.add_created_blob(blob);
783 callback.respond(());
784 }
785
786 ValidationRound { round, callback } => {
787 let validation_round = self
788 .txn_tracker
789 .oracle(|| async { Ok(OracleResponse::Round(round)) })
790 .await?
791 .to_round()?;
792 callback.respond(validation_round);
793 }
794
795 TotalStorageSize {
796 application,
797 callback,
798 } => {
799 let view = self.state.users.try_load_entry(&application).await?;
800 let result = match view {
801 Some(view) => {
802 let total_size = view.total_size();
803 (total_size.key, total_size.value)
804 }
805 None => (0, 0),
806 };
807 callback.respond(result);
808 }
809
810 AllowApplicationLogs { callback } => {
811 let allow = self
812 .state
813 .context()
814 .extra()
815 .execution_runtime_config()
816 .allow_application_logs;
817 callback.respond(allow);
818 }
819
820 #[cfg(web)]
821 Log { message, level } => match level {
822 tracing::log::Level::Trace | tracing::log::Level::Debug => {
823 tracing::debug!(target: "user_application_log", message = %message);
824 }
825 tracing::log::Level::Info => {
826 tracing::info!(target: "user_application_log", message = %message);
827 }
828 tracing::log::Level::Warn => {
829 tracing::warn!(target: "user_application_log", message = %message);
830 }
831 tracing::log::Level::Error => {
832 tracing::error!(target: "user_application_log", message = %message);
833 }
834 },
835 }
836
837 Ok(())
838 }
839
840 #[instrument(skip_all)]
843 async fn process_subscriptions(
844 &mut self,
845 context: ProcessStreamsContext,
846 ) -> Result<(), ExecutionError> {
847 let mut processed = BTreeSet::new();
850 loop {
851 let to_process = self
852 .txn_tracker
853 .take_streams_to_process()
854 .into_iter()
855 .filter_map(|(app_id, updates)| {
856 let updates = updates
857 .into_iter()
858 .filter_map(|update| {
859 if !processed.insert((
860 app_id,
861 update.chain_id,
862 update.stream_id.clone(),
863 )) {
864 return None;
865 }
866 Some(update)
867 })
868 .collect::<Vec<_>>();
869 if updates.is_empty() {
870 return None;
871 }
872 Some((app_id, updates))
873 })
874 .collect::<BTreeMap<_, _>>();
875 if to_process.is_empty() {
876 return Ok(());
877 }
878 for (app_id, updates) in to_process {
879 self.run_user_action(
880 app_id,
881 UserAction::ProcessStreams(context, updates),
882 None,
883 None,
884 )
885 .await?;
886 }
887 }
888 }
889
890 pub(crate) async fn run_user_action(
891 &mut self,
892 application_id: ApplicationId,
893 action: UserAction,
894 refund_grant_to: Option<Account>,
895 grant: Option<&mut Amount>,
896 ) -> Result<(), ExecutionError> {
897 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
898 .await
899 }
900
901 pub(crate) async fn service_and_dependencies(
903 &mut self,
904 application: ApplicationId,
905 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
906 let mut stack = vec![application];
909 let mut codes = vec![];
910 let mut descriptions = vec![];
911
912 while let Some(id) = stack.pop() {
913 let (code, description) = self.load_service(id).await?;
914 stack.extend(description.required_application_ids.iter().rev().copied());
915 codes.push(code);
916 descriptions.push(description);
917 }
918
919 codes.reverse();
920 descriptions.reverse();
921
922 Ok((codes, descriptions))
923 }
924
925 #[instrument(skip_all, fields(application_id = %application))]
927 async fn contract_and_dependencies(
928 &mut self,
929 application: ApplicationId,
930 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
931 let mut stack = vec![application];
934 let mut codes = vec![];
935 let mut descriptions = vec![];
936
937 while let Some(id) = stack.pop() {
938 let (code, description) = self.load_contract(id).await?;
939 stack.extend(description.required_application_ids.iter().rev().copied());
940 codes.push(code);
941 descriptions.push(description);
942 }
943
944 codes.reverse();
945 descriptions.reverse();
946
947 Ok((codes, descriptions))
948 }
949
950 #[instrument(skip_all, fields(application_id = %application_id))]
951 async fn run_user_action_with_runtime(
952 &mut self,
953 application_id: ApplicationId,
954 action: UserAction,
955 refund_grant_to: Option<Account>,
956 grant: Option<&mut Amount>,
957 ) -> Result<(), ExecutionError> {
958 let chain_id = self.state.context().extra().chain_id();
959 let mut cloned_grant = grant.as_ref().map(|x| **x);
960 let initial_balance = self
961 .resource_controller
962 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
963 .await?
964 .balance()?;
965 let mut controller = ResourceController::new(
966 self.resource_controller.policy().clone(),
967 self.resource_controller.tracker,
968 initial_balance,
969 );
970 let is_free = matches!(
971 &action,
972 UserAction::Message(..) | UserAction::ProcessStreams(..)
973 ) && self
974 .resource_controller
975 .policy()
976 .is_free_app(&application_id);
977 controller.is_free = is_free;
978 self.resource_controller.is_free = is_free;
979 let (execution_state_sender, mut execution_state_receiver) =
980 futures::channel::mpsc::unbounded();
981
982 let (codes, descriptions): (Vec<_>, Vec<_>) =
983 self.contract_and_dependencies(application_id).await?;
984
985 let allow_application_logs = self
986 .state
987 .context()
988 .extra()
989 .execution_runtime_config()
990 .allow_application_logs;
991
992 let contract_runtime_task = self
993 .state
994 .context()
995 .extra()
996 .thread_pool()
997 .run_send(JsVec(codes), move |codes| async move {
998 let runtime = ContractSyncRuntime::new(
999 execution_state_sender,
1000 chain_id,
1001 refund_grant_to,
1002 controller,
1003 &action,
1004 allow_application_logs,
1005 );
1006
1007 for (code, description) in codes.0.into_iter().zip(descriptions) {
1008 runtime.preload_contract(ApplicationId::from(&description), code, description);
1009 }
1010
1011 runtime.run_action(application_id, chain_id, action)
1012 })
1013 .await;
1014
1015 async {
1016 while let Some(request) = execution_state_receiver.next().await {
1017 self.handle_request(request).await?;
1018 }
1019 Ok::<(), ExecutionError>(())
1020 }
1021 .instrument(info_span!("handle_runtime_requests"))
1022 .await?;
1023
1024 let (result, controller) = contract_runtime_task.await??;
1025
1026 self.resource_controller.is_free = false;
1027
1028 self.txn_tracker.add_operation_result(result);
1029
1030 self.resource_controller
1031 .with_state_and_grant(&mut self.state.system, grant)
1032 .await?
1033 .merge_balance(initial_balance, controller.balance()?)?;
1034 self.resource_controller.tracker = controller.tracker;
1035
1036 Ok(())
1037 }
1038
1039 #[instrument(skip_all, fields(
1040 chain_id = %context.chain_id,
1041 block_height = %context.height,
1042 operation_type = %operation.as_ref(),
1043 ))]
1044 pub async fn execute_operation(
1045 &mut self,
1046 context: OperationContext,
1047 operation: Operation,
1048 ) -> Result<(), ExecutionError> {
1049 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1050 match operation {
1051 Operation::System(op) => {
1052 let new_application = self
1053 .state
1054 .system
1055 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
1056 .await?;
1057 if let Some((application_id, argument)) = new_application {
1058 let user_action = UserAction::Instantiate(context, argument);
1059 self.run_user_action(
1060 application_id,
1061 user_action,
1062 context.refund_grant_to(),
1063 None,
1064 )
1065 .await?;
1066 }
1067 }
1068 Operation::User {
1069 application_id,
1070 bytes,
1071 } => {
1072 self.run_user_action(
1073 application_id,
1074 UserAction::Operation(context, bytes),
1075 context.refund_grant_to(),
1076 None,
1077 )
1078 .await?;
1079 }
1080 }
1081 self.process_subscriptions(context.into()).await?;
1082 Ok(())
1083 }
1084
1085 #[instrument(skip_all, fields(
1086 chain_id = %context.chain_id,
1087 block_height = %context.height,
1088 origin = %context.origin,
1089 is_bouncing = %context.is_bouncing,
1090 message_type = %message.as_ref(),
1091 ))]
1092 pub async fn execute_message(
1093 &mut self,
1094 context: MessageContext,
1095 message: Message,
1096 grant: Option<&mut Amount>,
1097 ) -> Result<(), ExecutionError> {
1098 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1099 match message {
1100 Message::System(message) => {
1101 let outcome = self.state.system.execute_message(context, message).await?;
1102 self.txn_tracker.add_outgoing_messages(outcome);
1103 }
1104 Message::User {
1105 application_id,
1106 bytes,
1107 } => {
1108 self.run_user_action(
1109 application_id,
1110 UserAction::Message(context, bytes),
1111 context.refund_grant_to,
1112 grant,
1113 )
1114 .await?;
1115 }
1116 }
1117 self.process_subscriptions(context.into()).await?;
1118 Ok(())
1119 }
1120
1121 pub fn bounce_message(
1122 &mut self,
1123 context: MessageContext,
1124 grant: Amount,
1125 message: Message,
1126 ) -> Result<(), ExecutionError> {
1127 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1128 self.txn_tracker.add_outgoing_message(OutgoingMessage {
1129 destination: context.origin,
1130 authenticated_owner: context.authenticated_owner,
1131 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1132 grant,
1133 kind: MessageKind::Bouncing,
1134 message,
1135 });
1136 Ok(())
1137 }
1138
1139 pub fn send_refund(
1140 &mut self,
1141 context: MessageContext,
1142 amount: Amount,
1143 ) -> Result<(), ExecutionError> {
1144 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1145 if amount.is_zero() {
1146 return Ok(());
1147 }
1148 let Some(account) = context.refund_grant_to else {
1149 return Err(ExecutionError::InternalError(
1150 "Messages with grants should have a non-empty `refund_grant_to`",
1151 ));
1152 };
1153 let message = SystemMessage::Credit {
1154 amount,
1155 source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
1156 target: account.owner,
1157 };
1158 self.txn_tracker.add_outgoing_message(
1159 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1160 );
1161 Ok(())
1162 }
1163
1164 async fn receive_http_response(
1168 response: reqwest::Response,
1169 size_limit: u64,
1170 ) -> Result<http::Response, ExecutionError> {
1171 let status = response.status().as_u16();
1172 let maybe_content_length = response.content_length();
1173
1174 let headers = response
1175 .headers()
1176 .iter()
1177 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1178 .collect::<Vec<_>>();
1179
1180 let total_header_size = headers
1181 .iter()
1182 .map(|header| (header.name.len() + header.value.len()) as u64)
1183 .sum();
1184
1185 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1186 ExecutionError::HttpResponseSizeLimitExceeded {
1187 limit: size_limit,
1188 size: total_header_size,
1189 },
1190 )?;
1191
1192 if let Some(content_length) = maybe_content_length {
1193 if content_length > remaining_bytes {
1194 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1195 limit: size_limit,
1196 size: content_length + total_header_size,
1197 });
1198 }
1199 }
1200
1201 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1202 let mut body_stream = response.bytes_stream();
1203
1204 while let Some(bytes) = body_stream.next().await.transpose()? {
1205 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1206 ExecutionError::HttpResponseSizeLimitExceeded {
1207 limit: size_limit,
1208 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1209 },
1210 )?;
1211
1212 body.extend(&bytes);
1213 }
1214
1215 Ok(http::Response {
1216 status,
1217 headers,
1218 body,
1219 })
1220 }
1221}
1222
1223#[derive(Debug, strum::AsRefStr)]
1225pub enum ExecutionRequest {
1226 #[cfg(not(web))]
1227 LoadContract {
1228 id: ApplicationId,
1229 #[debug(skip)]
1230 callback: Sender<(UserContractCode, ApplicationDescription)>,
1231 },
1232
1233 #[cfg(not(web))]
1234 LoadService {
1235 id: ApplicationId,
1236 #[debug(skip)]
1237 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1238 },
1239
1240 ChainBalance {
1241 #[debug(skip)]
1242 callback: Sender<Amount>,
1243 },
1244
1245 OwnerBalance {
1246 owner: AccountOwner,
1247 #[debug(skip)]
1248 callback: Sender<Amount>,
1249 },
1250
1251 OwnerBalances {
1252 #[debug(skip)]
1253 callback: Sender<Vec<(AccountOwner, Amount)>>,
1254 },
1255
1256 BalanceOwners {
1257 #[debug(skip)]
1258 callback: Sender<Vec<AccountOwner>>,
1259 },
1260
1261 Allowance {
1262 owner: AccountOwner,
1263 spender: AccountOwner,
1264 #[debug(skip)]
1265 callback: Sender<Amount>,
1266 },
1267
1268 Allowances {
1269 #[debug(skip)]
1270 callback: Sender<Vec<(AccountOwner, AccountOwner, Amount)>>,
1271 },
1272
1273 Transfer {
1274 source: AccountOwner,
1275 destination: Account,
1276 amount: Amount,
1277 #[debug(skip_if = Option::is_none)]
1278 signer: Option<AccountOwner>,
1279 application_id: ApplicationId,
1280 #[debug(skip)]
1281 callback: Sender<()>,
1282 },
1283
1284 Claim {
1285 source: Account,
1286 destination: Account,
1287 amount: Amount,
1288 #[debug(skip_if = Option::is_none)]
1289 signer: Option<AccountOwner>,
1290 application_id: ApplicationId,
1291 #[debug(skip)]
1292 callback: Sender<()>,
1293 },
1294
1295 Approve {
1296 owner: AccountOwner,
1297 spender: AccountOwner,
1298 amount: Amount,
1299 #[debug(skip_if = Option::is_none)]
1300 signer: Option<AccountOwner>,
1301 application_id: ApplicationId,
1302 #[debug(skip)]
1303 callback: Sender<()>,
1304 },
1305
1306 TransferFrom {
1307 owner: AccountOwner,
1308 spender: AccountOwner,
1309 destination: Account,
1310 amount: Amount,
1311 #[debug(skip_if = Option::is_none)]
1312 signer: Option<AccountOwner>,
1313 application_id: ApplicationId,
1314 #[debug(skip)]
1315 callback: Sender<()>,
1316 },
1317
1318 SystemTimestamp {
1319 #[debug(skip)]
1320 callback: Sender<Timestamp>,
1321 },
1322
1323 ChainOwnership {
1324 #[debug(skip)]
1325 callback: Sender<ChainOwnership>,
1326 },
1327
1328 ApplicationPermissions {
1329 #[debug(skip)]
1330 callback: Sender<ApplicationPermissions>,
1331 },
1332
1333 ReadApplicationDescription {
1334 application_id: ApplicationId,
1335 #[debug(skip)]
1336 callback: Sender<ApplicationDescription>,
1337 },
1338
1339 ReadValueBytes {
1340 id: ApplicationId,
1341 #[debug(with = hex_debug)]
1342 key: Vec<u8>,
1343 #[debug(skip)]
1344 callback: Sender<Option<Vec<u8>>>,
1345 },
1346
1347 ContainsKey {
1348 id: ApplicationId,
1349 key: Vec<u8>,
1350 #[debug(skip)]
1351 callback: Sender<bool>,
1352 },
1353
1354 ContainsKeys {
1355 id: ApplicationId,
1356 #[debug(with = hex_vec_debug)]
1357 keys: Vec<Vec<u8>>,
1358 callback: Sender<Vec<bool>>,
1359 },
1360
1361 ReadMultiValuesBytes {
1362 id: ApplicationId,
1363 #[debug(with = hex_vec_debug)]
1364 keys: Vec<Vec<u8>>,
1365 #[debug(skip)]
1366 callback: Sender<Vec<Option<Vec<u8>>>>,
1367 },
1368
1369 FindKeysByPrefix {
1370 id: ApplicationId,
1371 #[debug(with = hex_debug)]
1372 key_prefix: Vec<u8>,
1373 #[debug(skip)]
1374 callback: Sender<Vec<Vec<u8>>>,
1375 },
1376
1377 FindKeyValuesByPrefix {
1378 id: ApplicationId,
1379 #[debug(with = hex_debug)]
1380 key_prefix: Vec<u8>,
1381 #[debug(skip)]
1382 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1383 },
1384
1385 WriteBatch {
1386 id: ApplicationId,
1387 batch: Batch,
1388 #[debug(skip)]
1389 callback: Sender<()>,
1390 },
1391
1392 OpenChain {
1393 ownership: ChainOwnership,
1394 #[debug(skip_if = Amount::is_zero)]
1395 balance: Amount,
1396 parent_id: ChainId,
1397 block_height: BlockHeight,
1398 application_permissions: ApplicationPermissions,
1399 timestamp: Timestamp,
1400 #[debug(skip)]
1401 callback: Sender<ChainId>,
1402 },
1403
1404 CloseChain {
1405 application_id: ApplicationId,
1406 #[debug(skip)]
1407 callback: Sender<Result<(), ExecutionError>>,
1408 },
1409
1410 ChangeOwnership {
1411 application_id: ApplicationId,
1412 ownership: ChainOwnership,
1413 #[debug(skip)]
1414 callback: Sender<Result<(), ExecutionError>>,
1415 },
1416
1417 ChangeApplicationPermissions {
1418 application_id: ApplicationId,
1419 application_permissions: ApplicationPermissions,
1420 #[debug(skip)]
1421 callback: Sender<Result<(), ExecutionError>>,
1422 },
1423
1424 PeekApplicationIndex {
1425 #[debug(skip)]
1426 callback: Sender<u32>,
1427 },
1428
1429 CreateApplication {
1430 chain_id: ChainId,
1431 block_height: BlockHeight,
1432 module_id: ModuleId,
1433 parameters: Vec<u8>,
1434 required_application_ids: Vec<ApplicationId>,
1435 #[debug(skip)]
1436 callback: Sender<CreateApplicationResult>,
1437 },
1438
1439 PerformHttpRequest {
1440 request: http::Request,
1441 http_responses_are_oracle_responses: bool,
1442 #[debug(skip)]
1443 callback: Sender<http::Response>,
1444 },
1445
1446 ReadBlobContent {
1447 blob_id: BlobId,
1448 #[debug(skip)]
1449 callback: Sender<BlobContent>,
1450 },
1451
1452 AssertBlobExists {
1453 blob_id: BlobId,
1454 #[debug(skip)]
1455 callback: Sender<()>,
1456 },
1457
1458 Emit {
1459 stream_id: StreamId,
1460 #[debug(with = hex_debug)]
1461 value: Vec<u8>,
1462 #[debug(skip)]
1463 callback: Sender<u32>,
1464 },
1465
1466 ReadEvent {
1467 event_id: EventId,
1468 callback: oneshot::Sender<Vec<u8>>,
1469 },
1470
1471 SubscribeToEvents {
1472 chain_id: ChainId,
1473 stream_id: StreamId,
1474 subscriber_app_id: ApplicationId,
1475 #[debug(skip)]
1476 callback: Sender<()>,
1477 },
1478
1479 UnsubscribeFromEvents {
1480 chain_id: ChainId,
1481 stream_id: StreamId,
1482 subscriber_app_id: ApplicationId,
1483 #[debug(skip)]
1484 callback: Sender<()>,
1485 },
1486
1487 GetApplicationPermissions {
1488 #[debug(skip)]
1489 callback: Sender<ApplicationPermissions>,
1490 },
1491
1492 QueryServiceOracle {
1493 deadline: Option<Instant>,
1494 application_id: ApplicationId,
1495 next_block_height: BlockHeight,
1496 query: Vec<u8>,
1497 #[debug(skip)]
1498 callback: Sender<Vec<u8>>,
1499 },
1500
1501 AddOutgoingMessage {
1502 message: crate::OutgoingMessage,
1503 #[debug(skip)]
1504 callback: Sender<()>,
1505 },
1506
1507 SetLocalTime {
1508 local_time: Timestamp,
1509 #[debug(skip)]
1510 callback: Sender<()>,
1511 },
1512
1513 AssertBefore {
1514 timestamp: Timestamp,
1515 #[debug(skip)]
1516 callback: Sender<Result<(), ExecutionError>>,
1517 },
1518
1519 AddCreatedBlob {
1520 blob: crate::Blob,
1521 #[debug(skip)]
1522 callback: Sender<()>,
1523 },
1524
1525 ValidationRound {
1526 round: Option<u32>,
1527 #[debug(skip)]
1528 callback: Sender<Option<u32>>,
1529 },
1530
1531 TotalStorageSize {
1532 application: ApplicationId,
1533 #[debug(skip)]
1534 callback: Sender<(u32, u32)>,
1535 },
1536
1537 AllowApplicationLogs {
1538 #[debug(skip)]
1539 callback: Sender<bool>,
1540 },
1541
1542 #[cfg(web)]
1544 Log {
1545 message: String,
1546 level: tracing::log::Level,
1547 },
1548}