1use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15 Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19 ownership::ChainOwnership,
20 time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27 execution::UserAction,
28 runtime::ContractSyncRuntime,
29 system::{CreateApplicationResult, OpenChainConfig},
30 util::{OracleResponseExt as _, RespondExt as _},
31 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
32 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
33 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
34 ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
35};
36
37pub struct ExecutionStateActor<'a, C> {
39 state: &'a mut ExecutionStateView<C>,
40 txn_tracker: &'a mut TransactionTracker,
41 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
42}
43
44#[cfg(with_metrics)]
45mod metrics {
46 use std::sync::LazyLock;
47
48 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
49 use prometheus::HistogramVec;
50
51 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
53 register_histogram_vec(
54 "load_contract_latency",
55 "Load contract latency",
56 &[],
57 exponential_bucket_latencies(250.0),
58 )
59 });
60
61 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
63 register_histogram_vec(
64 "load_service_latency",
65 "Load service latency",
66 &[],
67 exponential_bucket_latencies(250.0),
68 )
69 });
70}
71
72pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
73
74impl<'a, C> ExecutionStateActor<'a, C>
75where
76 C: Context + Clone + 'static,
77 C::Extra: ExecutionRuntimeContext,
78{
79 pub fn new(
81 state: &'a mut ExecutionStateView<C>,
82 txn_tracker: &'a mut TransactionTracker,
83 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
84 ) -> Self {
85 Self {
86 state,
87 txn_tracker,
88 resource_controller,
89 }
90 }
91
92 pub(crate) async fn load_contract(
93 &mut self,
94 id: ApplicationId,
95 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
96 #[cfg(with_metrics)]
97 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
98 let blob_id = id.description_blob_id();
99 let description = match self.txn_tracker.get_blob_content(&blob_id) {
100 Some(blob) => bcs::from_bytes(blob.bytes())?,
101 None => {
102 self.state
103 .system
104 .describe_application(id, self.txn_tracker)
105 .await?
106 }
107 };
108 let code = self
109 .state
110 .context()
111 .extra()
112 .get_user_contract(&description, self.txn_tracker)
113 .await?;
114 Ok((code, description))
115 }
116
117 pub(crate) async fn load_service(
118 &mut self,
119 id: ApplicationId,
120 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
121 #[cfg(with_metrics)]
122 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
123 let blob_id = id.description_blob_id();
124 let description = match self.txn_tracker.get_blob_content(&blob_id) {
125 Some(blob) => bcs::from_bytes(blob.bytes())?,
126 None => {
127 self.state
128 .system
129 .describe_application(id, self.txn_tracker)
130 .await?
131 }
132 };
133 let code = self
134 .state
135 .context()
136 .extra()
137 .get_user_service(&description, self.txn_tracker)
138 .await?;
139 Ok((code, description))
140 }
141
142 pub(crate) async fn handle_request(
144 &mut self,
145 request: ExecutionRequest,
146 ) -> Result<(), ExecutionError> {
147 use ExecutionRequest::*;
148 match request {
149 #[cfg(not(web))]
150 LoadContract { id, callback } => {
151 let (code, description) = self.load_contract(id).await?;
152 callback.respond((code, description))
153 }
154 #[cfg(not(web))]
155 LoadService { id, callback } => {
156 let (code, description) = self.load_service(id).await?;
157 callback.respond((code, description))
158 }
159
160 ChainBalance { callback } => {
161 let balance = *self.state.system.balance.get();
162 callback.respond(balance);
163 }
164
165 OwnerBalance { owner, callback } => {
166 let balance = self
167 .state
168 .system
169 .balances
170 .get(&owner)
171 .await?
172 .unwrap_or_default();
173 callback.respond(balance);
174 }
175
176 OwnerBalances { callback } => {
177 callback.respond(self.state.system.balances.index_values().await?);
178 }
179
180 BalanceOwners { callback } => {
181 let owners = self.state.system.balances.indices().await?;
182 callback.respond(owners);
183 }
184
185 Transfer {
186 source,
187 destination,
188 amount,
189 signer,
190 application_id,
191 callback,
192 } => {
193 let maybe_message = self
194 .state
195 .system
196 .transfer(signer, Some(application_id), source, destination, amount)
197 .await?;
198 self.txn_tracker.add_outgoing_messages(maybe_message);
199 callback.respond(());
200 }
201
202 Claim {
203 source,
204 destination,
205 amount,
206 signer,
207 application_id,
208 callback,
209 } => {
210 let maybe_message = self
211 .state
212 .system
213 .claim(
214 signer,
215 Some(application_id),
216 source.owner,
217 source.chain_id,
218 destination,
219 amount,
220 )
221 .await?;
222 self.txn_tracker.add_outgoing_messages(maybe_message);
223 callback.respond(());
224 }
225
226 SystemTimestamp { callback } => {
227 let timestamp = *self.state.system.timestamp.get();
228 callback.respond(timestamp);
229 }
230
231 ChainOwnership { callback } => {
232 let ownership = self.state.system.ownership.get().clone();
233 callback.respond(ownership);
234 }
235
236 ApplicationPermissions { callback } => {
237 let permissions = self.state.system.application_permissions.get().clone();
238 callback.respond(permissions);
239 }
240
241 ReadApplicationDescription {
242 application_id,
243 callback,
244 } => {
245 let blob_id = application_id.description_blob_id();
246 let description = match self.txn_tracker.get_blob_content(&blob_id) {
247 Some(blob) => bcs::from_bytes(blob.bytes())?,
248 None => {
249 let blob_content = self.state.system.read_blob_content(blob_id).await?;
250 self.state
251 .system
252 .blob_used(self.txn_tracker, blob_id)
253 .await?;
254 bcs::from_bytes(blob_content.bytes())?
255 }
256 };
257 callback.respond(description);
258 }
259
260 ContainsKey { id, key, callback } => {
261 let view = self.state.users.try_load_entry(&id).await?;
262 let result = match view {
263 Some(view) => view.contains_key(&key).await?,
264 None => false,
265 };
266 callback.respond(result);
267 }
268
269 ContainsKeys { id, keys, callback } => {
270 let view = self.state.users.try_load_entry(&id).await?;
271 let result = match view {
272 Some(view) => view.contains_keys(&keys).await?,
273 None => vec![false; keys.len()],
274 };
275 callback.respond(result);
276 }
277
278 ReadMultiValuesBytes { id, keys, callback } => {
279 let view = self.state.users.try_load_entry(&id).await?;
280 let values = match view {
281 Some(view) => view.multi_get(&keys).await?,
282 None => vec![None; keys.len()],
283 };
284 callback.respond(values);
285 }
286
287 ReadValueBytes { id, key, callback } => {
288 let view = self.state.users.try_load_entry(&id).await?;
289 let result = match view {
290 Some(view) => view.get(&key).await?,
291 None => None,
292 };
293 callback.respond(result);
294 }
295
296 FindKeysByPrefix {
297 id,
298 key_prefix,
299 callback,
300 } => {
301 let view = self.state.users.try_load_entry(&id).await?;
302 let result = match view {
303 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
304 None => Vec::new(),
305 };
306 callback.respond(result);
307 }
308
309 FindKeyValuesByPrefix {
310 id,
311 key_prefix,
312 callback,
313 } => {
314 let view = self.state.users.try_load_entry(&id).await?;
315 let result = match view {
316 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
317 None => Vec::new(),
318 };
319 callback.respond(result);
320 }
321
322 WriteBatch {
323 id,
324 batch,
325 callback,
326 } => {
327 let mut view = self.state.users.try_load_entry_mut(&id).await?;
328 view.write_batch(batch).await?;
329 callback.respond(());
330 }
331
332 OpenChain {
333 ownership,
334 balance,
335 parent_id,
336 block_height,
337 application_permissions,
338 timestamp,
339 callback,
340 } => {
341 let config = OpenChainConfig {
342 ownership,
343 balance,
344 application_permissions,
345 };
346 let chain_id = self
347 .state
348 .system
349 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
350 .await?;
351 callback.respond(chain_id);
352 }
353
354 CloseChain {
355 application_id,
356 callback,
357 } => {
358 let app_permissions = self.state.system.application_permissions.get();
359 if !app_permissions.can_manage_chain(&application_id) {
360 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
361 } else {
362 self.state.system.close_chain();
363 callback.respond(Ok(()));
364 }
365 }
366
367 ChangeOwnership {
368 application_id,
369 ownership,
370 callback,
371 } => {
372 let app_permissions = self.state.system.application_permissions.get();
373 if !app_permissions.can_manage_chain(&application_id) {
374 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
375 } else {
376 self.state.system.ownership.set(ownership);
377 callback.respond(Ok(()));
378 }
379 }
380
381 ChangeApplicationPermissions {
382 application_id,
383 application_permissions,
384 callback,
385 } => {
386 let app_permissions = self.state.system.application_permissions.get();
387 if !app_permissions.can_manage_chain(&application_id) {
388 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
389 } else {
390 self.state
391 .system
392 .application_permissions
393 .set(application_permissions);
394 callback.respond(Ok(()));
395 }
396 }
397
398 PeekApplicationIndex { callback } => {
399 let index = self.txn_tracker.peek_application_index();
400 callback.respond(index)
401 }
402
403 CreateApplication {
404 chain_id,
405 block_height,
406 module_id,
407 parameters,
408 required_application_ids,
409 callback,
410 } => {
411 let create_application_result = self
412 .state
413 .system
414 .create_application(
415 chain_id,
416 block_height,
417 module_id,
418 parameters,
419 required_application_ids,
420 self.txn_tracker,
421 )
422 .await?;
423 callback.respond(create_application_result);
424 }
425
426 PerformHttpRequest {
427 request,
428 http_responses_are_oracle_responses,
429 callback,
430 } => {
431 let system = &mut self.state.system;
432 let response = self
433 .txn_tracker
434 .oracle(|| async {
435 let headers = request
436 .headers
437 .into_iter()
438 .map(|http::Header { name, value }| {
439 Ok((name.parse()?, value.try_into()?))
440 })
441 .collect::<Result<HeaderMap, ExecutionError>>()?;
442
443 let url = Url::parse(&request.url)?;
444 let host = url
445 .host_str()
446 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
447
448 let (_epoch, committee) = system
449 .current_committee()
450 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
451 let allowed_hosts = &committee.policy().http_request_allow_list;
452
453 ensure!(
454 allowed_hosts.contains(host),
455 ExecutionError::UnauthorizedHttpRequest(url)
456 );
457
458 let request = Client::new()
459 .request(request.method.into(), url)
460 .body(request.body)
461 .headers(headers);
462 #[cfg(not(web))]
463 let request = request.timeout(linera_base::time::Duration::from_millis(
464 committee.policy().http_request_timeout_ms,
465 ));
466
467 let response = request.send().await?;
468
469 let mut response_size_limit =
470 committee.policy().maximum_http_response_bytes;
471
472 if http_responses_are_oracle_responses {
473 response_size_limit = response_size_limit
474 .min(committee.policy().maximum_oracle_response_bytes);
475 }
476 Ok(OracleResponse::Http(
477 Self::receive_http_response(response, response_size_limit).await?,
478 ))
479 })
480 .await?
481 .to_http_response()?;
482 callback.respond(response);
483 }
484
485 ReadBlobContent { blob_id, callback } => {
486 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
487 content.clone()
488 } else {
489 let content = self.state.system.read_blob_content(blob_id).await?;
490 if blob_id.blob_type == BlobType::Data {
491 self.resource_controller
492 .with_state(&mut self.state.system)
493 .await?
494 .track_blob_read(content.bytes().len() as u64)?;
495 }
496 self.state
497 .system
498 .blob_used(self.txn_tracker, blob_id)
499 .await?;
500 content
501 };
502 callback.respond(content)
503 }
504
505 AssertBlobExists { blob_id, callback } => {
506 self.state.system.assert_blob_exists(blob_id).await?;
507 if blob_id.blob_type == BlobType::Data {
509 self.resource_controller
510 .with_state(&mut self.state.system)
511 .await?
512 .track_blob_read(0)?;
513 }
514 let is_new = self
515 .state
516 .system
517 .blob_used(self.txn_tracker, blob_id)
518 .await?;
519 if is_new {
520 self.txn_tracker
521 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
522 }
523 callback.respond(());
524 }
525
526 Emit {
527 stream_id,
528 value,
529 callback,
530 } => {
531 let count = self
532 .state
533 .system
534 .stream_event_counts
535 .get_mut_or_default(&stream_id)
536 .await?;
537 let index = *count;
538 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
539 self.resource_controller
540 .with_state(&mut self.state.system)
541 .await?
542 .track_event_published(&value)?;
543 self.txn_tracker.add_event(stream_id, index, value);
544 callback.respond(index)
545 }
546
547 ReadEvent { event_id, callback } => {
548 let context = self.state.context();
549 let extra = context.extra();
550 let event = self
551 .txn_tracker
552 .oracle(|| async {
553 let event = extra
554 .get_event(event_id.clone())
555 .await?
556 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
557 Ok(OracleResponse::Event(event_id.clone(), event))
558 })
559 .await?
560 .to_event(&event_id)?;
561 self.resource_controller
562 .with_state(&mut self.state.system)
563 .await?
564 .track_event_read(event.len() as u64)?;
565 callback.respond(event);
566 }
567
568 SubscribeToEvents {
569 chain_id,
570 stream_id,
571 subscriber_app_id,
572 callback,
573 } => {
574 let subscriptions = self
575 .state
576 .system
577 .event_subscriptions
578 .get_mut_or_default(&(chain_id, stream_id.clone()))
579 .await?;
580 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
581 subscriptions.next_index
582 } else {
583 0
584 };
585 self.txn_tracker.add_stream_to_process(
586 subscriber_app_id,
587 chain_id,
588 stream_id,
589 0,
590 next_index,
591 );
592 callback.respond(());
593 }
594
595 UnsubscribeFromEvents {
596 chain_id,
597 stream_id,
598 subscriber_app_id,
599 callback,
600 } => {
601 let key = (chain_id, stream_id.clone());
602 let subscriptions = self
603 .state
604 .system
605 .event_subscriptions
606 .get_mut_or_default(&key)
607 .await?;
608 subscriptions.applications.remove(&subscriber_app_id);
609 if subscriptions.applications.is_empty() {
610 self.state.system.event_subscriptions.remove(&key)?;
611 }
612 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
613 self.txn_tracker
614 .remove_stream_to_process(app_id, chain_id, stream_id);
615 }
616 callback.respond(());
617 }
618
619 GetApplicationPermissions { callback } => {
620 let app_permissions = self.state.system.application_permissions.get();
621 callback.respond(app_permissions.clone());
622 }
623
624 QueryServiceOracle {
625 deadline,
626 application_id,
627 next_block_height,
628 query,
629 callback,
630 } => {
631 let state = &mut self.state;
632 let local_time = self.txn_tracker.local_time();
633 let created_blobs = self.txn_tracker.created_blobs().clone();
634 let bytes = self
635 .txn_tracker
636 .oracle(|| async {
637 let context = QueryContext {
638 chain_id: state.context().extra().chain_id(),
639 next_block_height,
640 local_time,
641 };
642 let QueryOutcome {
643 response,
644 operations,
645 } = Box::pin(state.query_user_application_with_deadline(
646 application_id,
647 context,
648 query,
649 deadline,
650 created_blobs,
651 ))
652 .await?;
653 ensure!(
654 operations.is_empty(),
655 ExecutionError::ServiceOracleQueryOperations(operations)
656 );
657 Ok(OracleResponse::Service(response))
658 })
659 .await?
660 .to_service_response()?;
661 callback.respond(bytes);
662 }
663
664 AddOutgoingMessage { message, callback } => {
665 self.txn_tracker.add_outgoing_message(message);
666 callback.respond(());
667 }
668
669 SetLocalTime {
670 local_time,
671 callback,
672 } => {
673 self.txn_tracker.set_local_time(local_time);
674 callback.respond(());
675 }
676
677 AssertBefore {
678 timestamp,
679 callback,
680 } => {
681 let result = if !self
682 .txn_tracker
683 .replay_oracle_response(OracleResponse::Assert)?
684 {
685 let local_time = self.txn_tracker.local_time();
687 if local_time >= timestamp {
688 Err(ExecutionError::AssertBefore {
689 timestamp,
690 local_time,
691 })
692 } else {
693 Ok(())
694 }
695 } else {
696 Ok(())
697 };
698 callback.respond(result);
699 }
700
701 AddCreatedBlob { blob, callback } => {
702 self.txn_tracker.add_created_blob(blob);
703 callback.respond(());
704 }
705
706 ValidationRound { round, callback } => {
707 let validation_round = self
708 .txn_tracker
709 .oracle(|| async { Ok(OracleResponse::Round(round)) })
710 .await?
711 .to_round()?;
712 callback.respond(validation_round);
713 }
714
715 TotalStorageSize {
716 application,
717 callback,
718 } => {
719 let view = self.state.users.try_load_entry(&application).await?;
720 let result = match view {
721 Some(view) => {
722 let total_size = view.total_size();
723 (total_size.key, total_size.value)
724 }
725 None => (0, 0),
726 };
727 callback.respond(result);
728 }
729
730 AllowApplicationLogs { callback } => {
731 let allow = self
732 .state
733 .context()
734 .extra()
735 .execution_runtime_config()
736 .allow_application_logs;
737 callback.respond(allow);
738 }
739
740 #[cfg(web)]
741 Log { message, level } => {
742 let formatted: js_sys::JsString = format!("[CONTRACT {level}] {message}").into();
744 match level {
745 tracing::log::Level::Trace | tracing::log::Level::Debug => {
746 web_sys::console::debug_1(&formatted)
747 }
748 tracing::log::Level::Info => web_sys::console::log_1(&formatted),
749 tracing::log::Level::Warn => web_sys::console::warn_1(&formatted),
750 tracing::log::Level::Error => web_sys::console::error_1(&formatted),
751 }
752 }
753 }
754
755 Ok(())
756 }
757
758 async fn process_subscriptions(
761 &mut self,
762 context: ProcessStreamsContext,
763 ) -> Result<(), ExecutionError> {
764 let mut processed = BTreeSet::new();
767 loop {
768 let to_process = self
769 .txn_tracker
770 .take_streams_to_process()
771 .into_iter()
772 .filter_map(|(app_id, updates)| {
773 let updates = updates
774 .into_iter()
775 .filter_map(|update| {
776 if !processed.insert((
777 app_id,
778 update.chain_id,
779 update.stream_id.clone(),
780 )) {
781 return None;
782 }
783 Some(update)
784 })
785 .collect::<Vec<_>>();
786 if updates.is_empty() {
787 return None;
788 }
789 Some((app_id, updates))
790 })
791 .collect::<BTreeMap<_, _>>();
792 if to_process.is_empty() {
793 return Ok(());
794 }
795 for (app_id, updates) in to_process {
796 self.run_user_action(
797 app_id,
798 UserAction::ProcessStreams(context, updates),
799 None,
800 None,
801 )
802 .await?;
803 }
804 }
805 }
806
807 pub(crate) async fn run_user_action(
808 &mut self,
809 application_id: ApplicationId,
810 action: UserAction,
811 refund_grant_to: Option<Account>,
812 grant: Option<&mut Amount>,
813 ) -> Result<(), ExecutionError> {
814 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
815 .await
816 }
817
818 pub(crate) async fn service_and_dependencies(
820 &mut self,
821 application: ApplicationId,
822 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
823 let mut stack = vec![application];
826 let mut codes = vec![];
827 let mut descriptions = vec![];
828
829 while let Some(id) = stack.pop() {
830 let (code, description) = self.load_service(id).await?;
831 stack.extend(description.required_application_ids.iter().rev().copied());
832 codes.push(code);
833 descriptions.push(description);
834 }
835
836 codes.reverse();
837 descriptions.reverse();
838
839 Ok((codes, descriptions))
840 }
841
842 async fn contract_and_dependencies(
844 &mut self,
845 application: ApplicationId,
846 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
847 let mut stack = vec![application];
850 let mut codes = vec![];
851 let mut descriptions = vec![];
852
853 while let Some(id) = stack.pop() {
854 let (code, description) = self.load_contract(id).await?;
855 stack.extend(description.required_application_ids.iter().rev().copied());
856 codes.push(code);
857 descriptions.push(description);
858 }
859
860 codes.reverse();
861 descriptions.reverse();
862
863 Ok((codes, descriptions))
864 }
865
866 async fn run_user_action_with_runtime(
867 &mut self,
868 application_id: ApplicationId,
869 action: UserAction,
870 refund_grant_to: Option<Account>,
871 grant: Option<&mut Amount>,
872 ) -> Result<(), ExecutionError> {
873 let chain_id = self.state.context().extra().chain_id();
874 let mut cloned_grant = grant.as_ref().map(|x| **x);
875 let initial_balance = self
876 .resource_controller
877 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
878 .await?
879 .balance()?;
880 let controller = ResourceController::new(
881 self.resource_controller.policy().clone(),
882 self.resource_controller.tracker,
883 initial_balance,
884 );
885 let (execution_state_sender, mut execution_state_receiver) =
886 futures::channel::mpsc::unbounded();
887
888 let (codes, descriptions): (Vec<_>, Vec<_>) =
889 self.contract_and_dependencies(application_id).await?;
890
891 let allow_application_logs = self
892 .state
893 .context()
894 .extra()
895 .execution_runtime_config()
896 .allow_application_logs;
897
898 let contract_runtime_task = self
899 .state
900 .context()
901 .extra()
902 .thread_pool()
903 .run_send(JsVec(codes), move |codes| async move {
904 let runtime = ContractSyncRuntime::new(
905 execution_state_sender,
906 chain_id,
907 refund_grant_to,
908 controller,
909 &action,
910 allow_application_logs,
911 );
912
913 for (code, description) in codes.0.into_iter().zip(descriptions) {
914 runtime.preload_contract(
915 ApplicationId::from(&description),
916 code,
917 description,
918 )?;
919 }
920
921 runtime.run_action(application_id, chain_id, action)
922 })
923 .await;
924
925 while let Some(request) = execution_state_receiver.next().await {
926 self.handle_request(request).await?;
927 }
928
929 let (result, controller) = contract_runtime_task.await??;
930
931 self.txn_tracker.add_operation_result(result);
932
933 self.resource_controller
934 .with_state_and_grant(&mut self.state.system, grant)
935 .await?
936 .merge_balance(initial_balance, controller.balance()?)?;
937 self.resource_controller.tracker = controller.tracker;
938
939 Ok(())
940 }
941
942 pub async fn execute_operation(
943 &mut self,
944 context: OperationContext,
945 operation: Operation,
946 ) -> Result<(), ExecutionError> {
947 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
948 match operation {
949 Operation::System(op) => {
950 let new_application = self
951 .state
952 .system
953 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
954 .await?;
955 if let Some((application_id, argument)) = new_application {
956 let user_action = UserAction::Instantiate(context, argument);
957 self.run_user_action(
958 application_id,
959 user_action,
960 context.refund_grant_to(),
961 None,
962 )
963 .await?;
964 }
965 }
966 Operation::User {
967 application_id,
968 bytes,
969 } => {
970 self.run_user_action(
971 application_id,
972 UserAction::Operation(context, bytes),
973 context.refund_grant_to(),
974 None,
975 )
976 .await?;
977 }
978 }
979 self.process_subscriptions(context.into()).await?;
980 Ok(())
981 }
982
983 pub async fn execute_message(
984 &mut self,
985 context: MessageContext,
986 message: Message,
987 grant: Option<&mut Amount>,
988 ) -> Result<(), ExecutionError> {
989 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
990 match message {
991 Message::System(message) => {
992 let outcome = self.state.system.execute_message(context, message).await?;
993 self.txn_tracker.add_outgoing_messages(outcome);
994 }
995 Message::User {
996 application_id,
997 bytes,
998 } => {
999 self.run_user_action(
1000 application_id,
1001 UserAction::Message(context, bytes),
1002 context.refund_grant_to,
1003 grant,
1004 )
1005 .await?;
1006 }
1007 }
1008 self.process_subscriptions(context.into()).await?;
1009 Ok(())
1010 }
1011
1012 pub fn bounce_message(
1013 &mut self,
1014 context: MessageContext,
1015 grant: Amount,
1016 message: Message,
1017 ) -> Result<(), ExecutionError> {
1018 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1019 self.txn_tracker.add_outgoing_message(OutgoingMessage {
1020 destination: context.origin,
1021 authenticated_owner: context.authenticated_owner,
1022 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1023 grant,
1024 kind: MessageKind::Bouncing,
1025 message,
1026 });
1027 Ok(())
1028 }
1029
1030 pub fn send_refund(
1031 &mut self,
1032 context: MessageContext,
1033 amount: Amount,
1034 ) -> Result<(), ExecutionError> {
1035 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1036 if amount.is_zero() {
1037 return Ok(());
1038 }
1039 let Some(account) = context.refund_grant_to else {
1040 return Err(ExecutionError::InternalError(
1041 "Messages with grants should have a non-empty `refund_grant_to`",
1042 ));
1043 };
1044 let message = SystemMessage::Credit {
1045 amount,
1046 source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
1047 target: account.owner,
1048 };
1049 self.txn_tracker.add_outgoing_message(
1050 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1051 );
1052 Ok(())
1053 }
1054
1055 async fn receive_http_response(
1059 response: reqwest::Response,
1060 size_limit: u64,
1061 ) -> Result<http::Response, ExecutionError> {
1062 let status = response.status().as_u16();
1063 let maybe_content_length = response.content_length();
1064
1065 let headers = response
1066 .headers()
1067 .iter()
1068 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1069 .collect::<Vec<_>>();
1070
1071 let total_header_size = headers
1072 .iter()
1073 .map(|header| (header.name.len() + header.value.len()) as u64)
1074 .sum();
1075
1076 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1077 ExecutionError::HttpResponseSizeLimitExceeded {
1078 limit: size_limit,
1079 size: total_header_size,
1080 },
1081 )?;
1082
1083 if let Some(content_length) = maybe_content_length {
1084 if content_length > remaining_bytes {
1085 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1086 limit: size_limit,
1087 size: content_length + total_header_size,
1088 });
1089 }
1090 }
1091
1092 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1093 let mut body_stream = response.bytes_stream();
1094
1095 while let Some(bytes) = body_stream.next().await.transpose()? {
1096 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1097 ExecutionError::HttpResponseSizeLimitExceeded {
1098 limit: size_limit,
1099 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1100 },
1101 )?;
1102
1103 body.extend(&bytes);
1104 }
1105
1106 Ok(http::Response {
1107 status,
1108 headers,
1109 body,
1110 })
1111 }
1112}
1113
1114#[derive(Debug)]
1116pub enum ExecutionRequest {
1117 #[cfg(not(web))]
1118 LoadContract {
1119 id: ApplicationId,
1120 #[debug(skip)]
1121 callback: Sender<(UserContractCode, ApplicationDescription)>,
1122 },
1123
1124 #[cfg(not(web))]
1125 LoadService {
1126 id: ApplicationId,
1127 #[debug(skip)]
1128 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1129 },
1130
1131 ChainBalance {
1132 #[debug(skip)]
1133 callback: Sender<Amount>,
1134 },
1135
1136 OwnerBalance {
1137 owner: AccountOwner,
1138 #[debug(skip)]
1139 callback: Sender<Amount>,
1140 },
1141
1142 OwnerBalances {
1143 #[debug(skip)]
1144 callback: Sender<Vec<(AccountOwner, Amount)>>,
1145 },
1146
1147 BalanceOwners {
1148 #[debug(skip)]
1149 callback: Sender<Vec<AccountOwner>>,
1150 },
1151
1152 Transfer {
1153 source: AccountOwner,
1154 destination: Account,
1155 amount: Amount,
1156 #[debug(skip_if = Option::is_none)]
1157 signer: Option<AccountOwner>,
1158 application_id: ApplicationId,
1159 #[debug(skip)]
1160 callback: Sender<()>,
1161 },
1162
1163 Claim {
1164 source: Account,
1165 destination: Account,
1166 amount: Amount,
1167 #[debug(skip_if = Option::is_none)]
1168 signer: Option<AccountOwner>,
1169 application_id: ApplicationId,
1170 #[debug(skip)]
1171 callback: Sender<()>,
1172 },
1173
1174 SystemTimestamp {
1175 #[debug(skip)]
1176 callback: Sender<Timestamp>,
1177 },
1178
1179 ChainOwnership {
1180 #[debug(skip)]
1181 callback: Sender<ChainOwnership>,
1182 },
1183
1184 ApplicationPermissions {
1185 #[debug(skip)]
1186 callback: Sender<ApplicationPermissions>,
1187 },
1188
1189 ReadApplicationDescription {
1190 application_id: ApplicationId,
1191 #[debug(skip)]
1192 callback: Sender<ApplicationDescription>,
1193 },
1194
1195 ReadValueBytes {
1196 id: ApplicationId,
1197 #[debug(with = hex_debug)]
1198 key: Vec<u8>,
1199 #[debug(skip)]
1200 callback: Sender<Option<Vec<u8>>>,
1201 },
1202
1203 ContainsKey {
1204 id: ApplicationId,
1205 key: Vec<u8>,
1206 #[debug(skip)]
1207 callback: Sender<bool>,
1208 },
1209
1210 ContainsKeys {
1211 id: ApplicationId,
1212 #[debug(with = hex_vec_debug)]
1213 keys: Vec<Vec<u8>>,
1214 callback: Sender<Vec<bool>>,
1215 },
1216
1217 ReadMultiValuesBytes {
1218 id: ApplicationId,
1219 #[debug(with = hex_vec_debug)]
1220 keys: Vec<Vec<u8>>,
1221 #[debug(skip)]
1222 callback: Sender<Vec<Option<Vec<u8>>>>,
1223 },
1224
1225 FindKeysByPrefix {
1226 id: ApplicationId,
1227 #[debug(with = hex_debug)]
1228 key_prefix: Vec<u8>,
1229 #[debug(skip)]
1230 callback: Sender<Vec<Vec<u8>>>,
1231 },
1232
1233 FindKeyValuesByPrefix {
1234 id: ApplicationId,
1235 #[debug(with = hex_debug)]
1236 key_prefix: Vec<u8>,
1237 #[debug(skip)]
1238 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1239 },
1240
1241 WriteBatch {
1242 id: ApplicationId,
1243 batch: Batch,
1244 #[debug(skip)]
1245 callback: Sender<()>,
1246 },
1247
1248 OpenChain {
1249 ownership: ChainOwnership,
1250 #[debug(skip_if = Amount::is_zero)]
1251 balance: Amount,
1252 parent_id: ChainId,
1253 block_height: BlockHeight,
1254 application_permissions: ApplicationPermissions,
1255 timestamp: Timestamp,
1256 #[debug(skip)]
1257 callback: Sender<ChainId>,
1258 },
1259
1260 CloseChain {
1261 application_id: ApplicationId,
1262 #[debug(skip)]
1263 callback: Sender<Result<(), ExecutionError>>,
1264 },
1265
1266 ChangeOwnership {
1267 application_id: ApplicationId,
1268 ownership: ChainOwnership,
1269 #[debug(skip)]
1270 callback: Sender<Result<(), ExecutionError>>,
1271 },
1272
1273 ChangeApplicationPermissions {
1274 application_id: ApplicationId,
1275 application_permissions: ApplicationPermissions,
1276 #[debug(skip)]
1277 callback: Sender<Result<(), ExecutionError>>,
1278 },
1279
1280 PeekApplicationIndex {
1281 #[debug(skip)]
1282 callback: Sender<u32>,
1283 },
1284
1285 CreateApplication {
1286 chain_id: ChainId,
1287 block_height: BlockHeight,
1288 module_id: ModuleId,
1289 parameters: Vec<u8>,
1290 required_application_ids: Vec<ApplicationId>,
1291 #[debug(skip)]
1292 callback: Sender<CreateApplicationResult>,
1293 },
1294
1295 PerformHttpRequest {
1296 request: http::Request,
1297 http_responses_are_oracle_responses: bool,
1298 #[debug(skip)]
1299 callback: Sender<http::Response>,
1300 },
1301
1302 ReadBlobContent {
1303 blob_id: BlobId,
1304 #[debug(skip)]
1305 callback: Sender<BlobContent>,
1306 },
1307
1308 AssertBlobExists {
1309 blob_id: BlobId,
1310 #[debug(skip)]
1311 callback: Sender<()>,
1312 },
1313
1314 Emit {
1315 stream_id: StreamId,
1316 #[debug(with = hex_debug)]
1317 value: Vec<u8>,
1318 #[debug(skip)]
1319 callback: Sender<u32>,
1320 },
1321
1322 ReadEvent {
1323 event_id: EventId,
1324 callback: oneshot::Sender<Vec<u8>>,
1325 },
1326
1327 SubscribeToEvents {
1328 chain_id: ChainId,
1329 stream_id: StreamId,
1330 subscriber_app_id: ApplicationId,
1331 #[debug(skip)]
1332 callback: Sender<()>,
1333 },
1334
1335 UnsubscribeFromEvents {
1336 chain_id: ChainId,
1337 stream_id: StreamId,
1338 subscriber_app_id: ApplicationId,
1339 #[debug(skip)]
1340 callback: Sender<()>,
1341 },
1342
1343 GetApplicationPermissions {
1344 #[debug(skip)]
1345 callback: Sender<ApplicationPermissions>,
1346 },
1347
1348 QueryServiceOracle {
1349 deadline: Option<Instant>,
1350 application_id: ApplicationId,
1351 next_block_height: BlockHeight,
1352 query: Vec<u8>,
1353 #[debug(skip)]
1354 callback: Sender<Vec<u8>>,
1355 },
1356
1357 AddOutgoingMessage {
1358 message: crate::OutgoingMessage,
1359 #[debug(skip)]
1360 callback: Sender<()>,
1361 },
1362
1363 SetLocalTime {
1364 local_time: Timestamp,
1365 #[debug(skip)]
1366 callback: Sender<()>,
1367 },
1368
1369 AssertBefore {
1370 timestamp: Timestamp,
1371 #[debug(skip)]
1372 callback: Sender<Result<(), ExecutionError>>,
1373 },
1374
1375 AddCreatedBlob {
1376 blob: crate::Blob,
1377 #[debug(skip)]
1378 callback: Sender<()>,
1379 },
1380
1381 ValidationRound {
1382 round: Option<u32>,
1383 #[debug(skip)]
1384 callback: Sender<Option<u32>>,
1385 },
1386
1387 TotalStorageSize {
1388 application: ApplicationId,
1389 #[debug(skip)]
1390 callback: Sender<(u32, u32)>,
1391 },
1392
1393 AllowApplicationLogs {
1394 #[debug(skip)]
1395 callback: Sender<bool>,
1396 },
1397
1398 #[cfg(web)]
1400 Log {
1401 message: String,
1402 level: tracing::log::Level,
1403 },
1404}