1use std::{
7 collections::{BTreeMap, BTreeSet},
8 sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16 data_types::{
17 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18 StreamUpdate, Timestamp,
19 },
20 ensure, hex_debug, hex_vec_debug, http,
21 identifiers::{
22 Account, AccountOwner, BlobId, BlobType, ChainId, EventId, GenericApplicationId,
23 OwnerSpender, StreamId,
24 },
25 ownership::ChainOwnership,
26 time::Instant,
27};
28use linera_views::{batch::Batch, context::Context, views::View};
29use oneshot::Sender;
30use reqwest::{header::HeaderMap, Client, Url};
31use tracing::{info_span, instrument, Instrument as _};
32
33use crate::{
34 execution::UserAction,
35 runtime::ContractSyncRuntime,
36 system::{CreateApplicationResult, OpenChainConfig},
37 util::{OracleResponseExt as _, RespondExt as _},
38 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
39 ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
40 OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
41 ResourceController, SystemMessage, SystemOperation, TransactionTracker, UserContractCode,
42 UserServiceCode,
43};
44
45pub struct ExecutionStateActor<'a, C> {
47 state: &'a mut ExecutionStateView<C>,
48 txn_tracker: &'a mut TransactionTracker,
49 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
50}
51
52#[cfg(with_metrics)]
53mod metrics {
54 use std::sync::LazyLock;
55
56 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
57 use prometheus::HistogramVec;
58
59 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
61 register_histogram_vec(
62 "load_contract_latency",
63 "Load contract latency",
64 &[],
65 exponential_bucket_latencies(250.0),
66 )
67 });
68
69 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
71 register_histogram_vec(
72 "load_service_latency",
73 "Load service latency",
74 &[],
75 exponential_bucket_latencies(250.0),
76 )
77 });
78}
79
80pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
81
82impl<'a, C> ExecutionStateActor<'a, C>
83where
84 C: Context + Clone + 'static,
85 C::Extra: ExecutionRuntimeContext,
86{
87 pub fn new(
89 state: &'a mut ExecutionStateView<C>,
90 txn_tracker: &'a mut TransactionTracker,
91 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
92 ) -> Self {
93 Self {
94 state,
95 txn_tracker,
96 resource_controller,
97 }
98 }
99
100 #[instrument(skip_all, fields(application_id = %id))]
101 pub(crate) async fn load_contract(
102 &mut self,
103 id: ApplicationId,
104 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
105 #[cfg(with_metrics)]
106 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
107 let blob_id = id.description_blob_id();
108 let description = match self.txn_tracker.get_blob_content(&blob_id) {
109 Some(blob) => bcs::from_bytes(blob.bytes())?,
110 None => {
111 self.state
112 .system
113 .describe_application(id, self.txn_tracker)
114 .await?
115 }
116 };
117 let code = self
118 .state
119 .context()
120 .extra()
121 .get_user_contract(&description, self.txn_tracker)
122 .await?;
123 Ok((code, description))
124 }
125
126 pub(crate) async fn load_service(
127 &mut self,
128 id: ApplicationId,
129 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
130 #[cfg(with_metrics)]
131 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
132 let blob_id = id.description_blob_id();
133 let description = match self.txn_tracker.get_blob_content(&blob_id) {
134 Some(blob) => bcs::from_bytes(blob.bytes())?,
135 None => {
136 self.state
137 .system
138 .describe_application(id, self.txn_tracker)
139 .await?
140 }
141 };
142 let code = self
143 .state
144 .context()
145 .extra()
146 .get_user_service(&description, self.txn_tracker)
147 .await?;
148 Ok((code, description))
149 }
150
151 #[instrument(
153 skip_all,
154 fields(request_type = %request.as_ref())
155 )]
156 pub(crate) async fn handle_request(
157 &mut self,
158 request: ExecutionRequest,
159 ) -> Result<(), ExecutionError> {
160 use ExecutionRequest::*;
161 match request {
162 #[cfg(not(web))]
163 LoadContract { id, callback } => {
164 let (code, description) = self.load_contract(id).await?;
165 callback.respond((code, description))
166 }
167 #[cfg(not(web))]
168 LoadService { id, callback } => {
169 let (code, description) = self.load_service(id).await?;
170 callback.respond((code, description))
171 }
172
173 ChainBalance { callback } => {
174 let balance = *self.state.system.balance.get();
175 callback.respond(balance);
176 }
177
178 OwnerBalance { owner, callback } => {
179 let balance = self
180 .state
181 .system
182 .balances
183 .get(&owner)
184 .await?
185 .unwrap_or_default();
186 callback.respond(balance);
187 }
188
189 OwnerBalances { callback } => {
190 callback.respond(self.state.system.balances.index_values().await?);
191 }
192
193 BalanceOwners { callback } => {
194 let owners = self.state.system.balances.indices().await?;
195 callback.respond(owners);
196 }
197
198 Allowance {
199 owner,
200 spender,
201 callback,
202 } => {
203 let owner_spender = OwnerSpender::new(owner, spender);
204 let allowance = self
205 .state
206 .system
207 .allowances
208 .get(&owner_spender)
209 .await?
210 .unwrap_or_default();
211 callback.respond(allowance);
212 }
213
214 Allowances { callback } => {
215 let entries: Vec<_> = self
216 .state
217 .system
218 .allowances
219 .index_values()
220 .await?
221 .into_iter()
222 .map(|(os, amount)| (os.owner, os.spender, amount))
223 .collect();
224 callback.respond(entries);
225 }
226
227 Transfer {
228 source,
229 destination,
230 amount,
231 signer,
232 application_id,
233 callback,
234 } => {
235 let maybe_message = self
236 .state
237 .system
238 .transfer(signer, Some(application_id), source, destination, amount)
239 .await?;
240 self.txn_tracker.add_outgoing_messages(maybe_message);
241 callback.respond(());
242 }
243
244 Claim {
245 source,
246 destination,
247 amount,
248 signer,
249 application_id,
250 callback,
251 } => {
252 let maybe_message = self
253 .state
254 .system
255 .claim(
256 signer,
257 Some(application_id),
258 source.owner,
259 source.chain_id,
260 destination,
261 amount,
262 )
263 .await?;
264 self.txn_tracker.add_outgoing_messages(maybe_message);
265 callback.respond(());
266 }
267
268 Approve {
269 owner,
270 spender,
271 amount,
272 signer,
273 application_id,
274 callback,
275 } => {
276 self.state
277 .system
278 .approve(signer, Some(application_id), owner, spender, amount)
279 .await?;
280 callback.respond(());
281 }
282
283 TransferFrom {
284 owner,
285 spender,
286 destination,
287 amount,
288 signer,
289 application_id,
290 callback,
291 } => {
292 let maybe_message = self
293 .state
294 .system
295 .transfer_from(
296 signer,
297 Some(application_id),
298 owner,
299 spender,
300 destination,
301 amount,
302 )
303 .await?;
304 self.txn_tracker.add_outgoing_messages(maybe_message);
305 callback.respond(());
306 }
307
308 SystemTimestamp { callback } => {
309 let timestamp = *self.state.system.timestamp.get();
310 callback.respond(timestamp);
311 }
312
313 ChainOwnership { callback } => {
314 let ownership = self.state.system.ownership.get().await?.clone();
315 callback.respond(ownership);
316 }
317
318 ApplicationPermissions { callback } => {
319 let permissions = self
320 .state
321 .system
322 .application_permissions
323 .get()
324 .await?
325 .clone();
326 callback.respond(permissions);
327 }
328
329 ReadApplicationDescription {
330 application_id,
331 callback,
332 } => {
333 let blob_id = application_id.description_blob_id();
334 let description = match self.txn_tracker.get_blob_content(&blob_id) {
335 Some(blob) => bcs::from_bytes(blob.bytes())?,
336 None => {
337 let blob_content = self.state.system.read_blob_content(blob_id).await?;
338 self.state
339 .system
340 .blob_used(self.txn_tracker, blob_id)
341 .await?;
342 bcs::from_bytes(blob_content.bytes())?
343 }
344 };
345 callback.respond(description);
346 }
347
348 ContainsKey { id, key, callback } => {
349 let view = self.state.users.try_load_entry(&id).await?;
350 let result = match view {
351 Some(view) => view.contains_key(&key).await?,
352 None => false,
353 };
354 callback.respond(result);
355 }
356
357 ContainsKeys { id, keys, callback } => {
358 let view = self.state.users.try_load_entry(&id).await?;
359 let result = match view {
360 Some(view) => view.contains_keys(&keys).await?,
361 None => vec![false; keys.len()],
362 };
363 callback.respond(result);
364 }
365
366 ReadMultiValuesBytes { id, keys, callback } => {
367 let view = self.state.users.try_load_entry(&id).await?;
368 let values = match view {
369 Some(view) => view.multi_get(&keys).await?,
370 None => vec![None; keys.len()],
371 };
372 callback.respond(values);
373 }
374
375 ReadValueBytes { id, key, callback } => {
376 let view = self.state.users.try_load_entry(&id).await?;
377 let result = match view {
378 Some(view) => view.get(&key).await?,
379 None => None,
380 };
381 callback.respond(result);
382 }
383
384 FindKeysByPrefix {
385 id,
386 key_prefix,
387 callback,
388 } => {
389 let view = self.state.users.try_load_entry(&id).await?;
390 let result = match view {
391 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
392 None => Vec::new(),
393 };
394 callback.respond(result);
395 }
396
397 FindKeyValuesByPrefix {
398 id,
399 key_prefix,
400 callback,
401 } => {
402 let view = self.state.users.try_load_entry(&id).await?;
403 let result = match view {
404 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
405 None => Vec::new(),
406 };
407 callback.respond(result);
408 }
409
410 WriteBatch {
411 id,
412 batch,
413 callback,
414 } => {
415 let mut view = self.state.users.try_load_entry_mut(&id).await?;
416 view.write_batch(batch)?;
417 callback.respond(());
418 }
419
420 OpenChain {
421 ownership,
422 balance,
423 parent_id,
424 block_height,
425 application_permissions,
426 timestamp,
427 callback,
428 } => {
429 let config = OpenChainConfig {
430 ownership,
431 balance,
432 application_permissions,
433 };
434 let chain_id = self
435 .state
436 .system
437 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
438 .await?;
439 callback.respond(chain_id);
440 }
441
442 CloseChain {
443 application_id,
444 callback,
445 } => {
446 let app_permissions = self.state.system.application_permissions.get().await?;
447 if !app_permissions.can_manage_chain(&application_id) {
448 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
449 } else {
450 self.state.system.close_chain();
451 callback.respond(Ok(()));
452 }
453 }
454
455 ChangeOwnership {
456 application_id,
457 ownership,
458 callback,
459 } => {
460 let app_permissions = self.state.system.application_permissions.get().await?;
461 if !app_permissions.can_manage_chain(&application_id) {
462 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
463 } else {
464 self.state.system.ownership.set(ownership);
465 callback.respond(Ok(()));
466 }
467 }
468
469 ChangeApplicationPermissions {
470 application_id,
471 application_permissions,
472 callback,
473 } => {
474 let app_permissions = self.state.system.application_permissions.get().await?;
475 if !app_permissions.can_manage_chain(&application_id) {
476 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
477 } else {
478 self.state
479 .system
480 .application_permissions
481 .set(application_permissions);
482 callback.respond(Ok(()));
483 }
484 }
485
486 PeekApplicationIndex { callback } => {
487 let index = self.txn_tracker.peek_application_index();
488 callback.respond(index)
489 }
490
491 CreateApplication {
492 chain_id,
493 block_height,
494 module_id,
495 parameters,
496 required_application_ids,
497 callback,
498 } => {
499 let create_application_result = self
500 .state
501 .system
502 .create_application(
503 chain_id,
504 block_height,
505 module_id,
506 parameters,
507 required_application_ids,
508 self.txn_tracker,
509 )
510 .await?;
511 callback.respond(create_application_result);
512 }
513
514 PerformHttpRequest {
515 request,
516 http_responses_are_oracle_responses,
517 callback,
518 } => {
519 let system = &mut self.state.system;
520 let response = self
521 .txn_tracker
522 .oracle(|| async {
523 let headers = request
524 .headers
525 .into_iter()
526 .map(|http::Header { name, value }| {
527 Ok((name.parse()?, value.try_into()?))
528 })
529 .collect::<Result<HeaderMap, ExecutionError>>()?;
530
531 let url = Url::parse(&request.url)?;
532 let host = url
533 .host_str()
534 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
535
536 let (_epoch, committee) = system
537 .current_committee()
538 .await?
539 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
540 let allowed_hosts = &committee.policy().http_request_allow_list;
541
542 ensure!(
543 allowed_hosts.contains(host),
544 ExecutionError::UnauthorizedHttpRequest(url)
545 );
546
547 let request = Client::new()
548 .request(request.method.into(), url)
549 .body(request.body)
550 .headers(headers);
551 #[cfg(not(web))]
552 let request = request.timeout(linera_base::time::Duration::from_millis(
553 committee.policy().http_request_timeout_ms,
554 ));
555
556 let response = request.send().await?;
557
558 let mut response_size_limit =
559 committee.policy().maximum_http_response_bytes;
560
561 if http_responses_are_oracle_responses {
562 response_size_limit = response_size_limit
563 .min(committee.policy().maximum_oracle_response_bytes);
564 }
565 Ok(OracleResponse::Http(
566 Self::receive_http_response(response, response_size_limit).await?,
567 ))
568 })
569 .await?
570 .to_http_response()?;
571 callback.respond(response);
572 }
573
574 ReadBlobContent { blob_id, callback } => {
575 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
576 content.clone()
577 } else {
578 let content = self.state.system.read_blob_content(blob_id).await?;
579 if blob_id.blob_type == BlobType::Data {
580 self.resource_controller
581 .with_state(&mut self.state.system)
582 .await?
583 .track_blob_read(content.bytes().len() as u64)?;
584 }
585 self.state
586 .system
587 .blob_used(self.txn_tracker, blob_id)
588 .await?;
589 content
590 };
591 callback.respond(content)
592 }
593
594 AssertBlobExists { blob_id, callback } => {
595 self.state.system.assert_blob_exists(blob_id).await?;
596 if blob_id.blob_type == BlobType::Data {
598 self.resource_controller
599 .with_state(&mut self.state.system)
600 .await?
601 .track_blob_read(0)?;
602 }
603 let is_new = self
604 .state
605 .system
606 .blob_used(self.txn_tracker, blob_id)
607 .await?;
608 if is_new {
609 self.txn_tracker
610 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
611 }
612 callback.respond(());
613 }
614
615 Emit {
616 stream_id,
617 value,
618 callback,
619 } => {
620 let count = self
621 .state
622 .system
623 .stream_event_counts
624 .get_mut_or_default(&stream_id)
625 .await?;
626 let index = *count;
627 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
628 self.resource_controller
629 .with_state(&mut self.state.system)
630 .await?
631 .track_event_published(&value)?;
632 self.txn_tracker.add_event(stream_id, index, value);
633 callback.respond(index)
634 }
635
636 ReadEvent { event_id, callback } => {
637 let context = self.state.context();
638 let extra = context.extra();
639 let event = self
640 .txn_tracker
641 .oracle(|| async {
642 let event = extra
643 .get_event(event_id.clone())
644 .await?
645 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
646 Ok(OracleResponse::Event(
647 event_id.clone(),
648 Arc::unwrap_or_clone(event),
649 ))
650 })
651 .await?
652 .to_event(&event_id)?;
653 self.resource_controller
654 .with_state(&mut self.state.system)
655 .await?
656 .track_event_read(event.len() as u64)?;
657 callback.respond(event);
658 }
659
660 SubscribeToEvents {
661 chain_id,
662 stream_id,
663 subscriber_app_id,
664 callback,
665 } => {
666 let subscriptions = self
667 .state
668 .system
669 .event_subscriptions
670 .get_mut_or_default(&(chain_id, stream_id.clone()))
671 .await?;
672 let next_index = match subscriptions.applications.entry(subscriber_app_id) {
673 std::collections::btree_map::Entry::Vacant(entry) => {
674 entry.insert(0);
675 subscriptions.min_next_index = 0;
676 0
677 }
678 std::collections::btree_map::Entry::Occupied(entry) => *entry.get(),
679 };
680 self.txn_tracker.add_stream_to_process(
681 subscriber_app_id,
682 chain_id,
683 stream_id,
684 0,
685 next_index,
686 );
687 callback.respond(());
688 }
689
690 UnsubscribeFromEvents {
691 chain_id,
692 stream_id,
693 subscriber_app_id,
694 callback,
695 } => {
696 let key = (chain_id, stream_id.clone());
697 let subscriptions = self
698 .state
699 .system
700 .event_subscriptions
701 .get_mut_or_default(&key)
702 .await?;
703 subscriptions.applications.remove(&subscriber_app_id);
704 if subscriptions.applications.is_empty() {
705 self.state.system.event_subscriptions.remove(&key)?;
706 } else {
707 subscriptions.recalculate_min();
708 }
709 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
710 self.txn_tracker
711 .remove_stream_to_process(app_id, chain_id, stream_id);
712 }
713 callback.respond(());
714 }
715
716 GetApplicationPermissions { callback } => {
717 let app_permissions = self.state.system.application_permissions.get().await?;
718 callback.respond(app_permissions.clone());
719 }
720
721 QueryServiceOracle {
722 deadline,
723 application_id,
724 next_block_height,
725 query,
726 callback,
727 } => {
728 let state = &mut self.state;
729 let local_time = self.txn_tracker.local_time();
730 let created_blobs = self.txn_tracker.created_blobs().clone();
731 let bytes = self
732 .txn_tracker
733 .oracle(|| async {
734 let context = QueryContext {
735 chain_id: state.context().extra().chain_id(),
736 next_block_height,
737 local_time,
738 };
739 let QueryOutcome {
740 response,
741 operations,
742 } = Box::pin(state.query_user_application_with_deadline(
743 application_id,
744 context,
745 query,
746 deadline,
747 created_blobs,
748 ))
749 .await?;
750 ensure!(
751 operations.is_empty(),
752 ExecutionError::ServiceOracleQueryOperations(operations)
753 );
754 Ok(OracleResponse::Service(response))
755 })
756 .await?
757 .to_service_response()?;
758 callback.respond(bytes);
759 }
760
761 AddOutgoingMessage { message, callback } => {
762 self.txn_tracker.add_outgoing_message(message);
763 callback.respond(());
764 }
765
766 SetLocalTime {
767 local_time,
768 callback,
769 } => {
770 self.txn_tracker.set_local_time(local_time);
771 callback.respond(());
772 }
773
774 AssertBefore {
775 timestamp,
776 callback,
777 } => {
778 let result = if !self
779 .txn_tracker
780 .replay_oracle_response(OracleResponse::Assert)?
781 {
782 let local_time = self.txn_tracker.local_time();
784 if local_time >= timestamp {
785 Err(ExecutionError::AssertBefore {
786 timestamp,
787 local_time,
788 })
789 } else {
790 Ok(())
791 }
792 } else {
793 Ok(())
794 };
795 callback.respond(result);
796 }
797
798 AddCreatedBlob { blob, callback } => {
799 if self.resource_controller.is_free {
800 self.txn_tracker.mark_blob_free(blob.id());
801 }
802 self.txn_tracker.add_created_blob(blob);
803 callback.respond(());
804 }
805
806 ValidationRound { round, callback } => {
807 let validation_round = self
808 .txn_tracker
809 .oracle(|| async { Ok(OracleResponse::Round(round)) })
810 .await?
811 .to_round()?;
812 callback.respond(validation_round);
813 }
814
815 HasEmptyStorage {
816 application,
817 callback,
818 } => {
819 let view = self.state.users.try_load_entry(&application).await?;
820 let result = match view {
821 Some(view) => view.count().await? == 0,
822 None => true,
823 };
824 callback.respond(result);
825 }
826
827 AllowApplicationLogs { callback } => {
828 let allow = self
829 .state
830 .context()
831 .extra()
832 .execution_runtime_config()
833 .allow_application_logs;
834 callback.respond(allow);
835 }
836
837 #[cfg(web)]
838 Log { message, level } => match level {
839 tracing::log::Level::Trace | tracing::log::Level::Debug => {
840 tracing::debug!(target: "user_application_log", message = %message);
841 }
842 tracing::log::Level::Info => {
843 tracing::info!(target: "user_application_log", message = %message);
844 }
845 tracing::log::Level::Warn => {
846 tracing::warn!(target: "user_application_log", message = %message);
847 }
848 tracing::log::Level::Error => {
849 tracing::error!(target: "user_application_log", message = %message);
850 }
851 },
852 }
853
854 Ok(())
855 }
856
857 #[instrument(skip_all)]
860 async fn process_subscriptions(
861 &mut self,
862 context: ProcessStreamsContext,
863 ) -> Result<(), ExecutionError> {
864 let mut processed = BTreeSet::new();
867 loop {
868 let to_process = self
869 .txn_tracker
870 .take_streams_to_process()
871 .into_iter()
872 .filter_map(|(app_id, updates)| {
873 let updates = updates
874 .into_iter()
875 .filter_map(|update| {
876 if !processed.insert((
877 app_id,
878 update.chain_id,
879 update.stream_id.clone(),
880 )) {
881 return None;
882 }
883 Some(update)
884 })
885 .collect::<Vec<_>>();
886 if updates.is_empty() {
887 return None;
888 }
889 Some((app_id, updates))
890 })
891 .collect::<BTreeMap<_, _>>();
892 if to_process.is_empty() {
893 return Ok(());
894 }
895 for (app_id, updates) in to_process {
896 self.run_user_action(
897 app_id,
898 UserAction::ProcessStreams(context, updates),
899 None,
900 None,
901 )
902 .await?;
903 }
904 }
905 }
906
907 async fn summarize_events_at_checkpoint(
919 &mut self,
920 context: OperationContext,
921 ) -> Result<(), ExecutionError> {
922 let mut updates_by_app = BTreeMap::<ApplicationId, Vec<StreamUpdate>>::new();
923 for stream_id in self.state.previous_event_blocks.indices().await? {
924 let GenericApplicationId::User(application_id) = stream_id.application_id else {
925 continue;
926 };
927 let next_index = self
928 .state
929 .system
930 .stream_event_counts
931 .get(&stream_id)
932 .await?
933 .unwrap_or(0);
934 updates_by_app
937 .entry(application_id)
938 .or_default()
939 .push(StreamUpdate {
940 chain_id: context.chain_id,
941 stream_id,
942 previous_index: 0,
943 next_index,
944 });
945 }
946
947 self.state.previous_event_blocks.clear();
950
951 let process_context = ProcessStreamsContext::from(context);
952 for (application_id, updates) in updates_by_app {
953 self.run_user_action(
954 application_id,
955 UserAction::SummarizeEvents(process_context, updates),
956 None,
957 None,
958 )
959 .await?;
960 }
961 Ok(())
962 }
963
964 pub(crate) async fn run_user_action(
965 &mut self,
966 application_id: ApplicationId,
967 action: UserAction,
968 refund_grant_to: Option<Account>,
969 grant: Option<&mut Amount>,
970 ) -> Result<(), ExecutionError> {
971 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
972 .await
973 }
974
975 pub(crate) async fn service_and_dependencies(
977 &mut self,
978 application: ApplicationId,
979 ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
980 let mut stack = vec![application];
983 let mut codes = vec![];
984 let mut descriptions = vec![];
985
986 while let Some(id) = stack.pop() {
987 let (code, description) = self.load_service(id).await?;
988 stack.extend(description.required_application_ids.iter().rev().copied());
989 codes.push(code);
990 descriptions.push(description);
991 }
992
993 codes.reverse();
994 descriptions.reverse();
995
996 Ok((codes, descriptions))
997 }
998
999 #[instrument(skip_all, fields(application_id = %application))]
1001 async fn contract_and_dependencies(
1002 &mut self,
1003 application: ApplicationId,
1004 ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
1005 let mut stack = vec![application];
1008 let mut codes = vec![];
1009 let mut descriptions = vec![];
1010
1011 while let Some(id) = stack.pop() {
1012 let (code, description) = self.load_contract(id).await?;
1013 stack.extend(description.required_application_ids.iter().rev().copied());
1014 codes.push(code);
1015 descriptions.push(description);
1016 }
1017
1018 codes.reverse();
1019 descriptions.reverse();
1020
1021 Ok((codes, descriptions))
1022 }
1023
1024 #[instrument(skip_all, fields(application_id = %application_id))]
1025 async fn run_user_action_with_runtime(
1026 &mut self,
1027 application_id: ApplicationId,
1028 action: UserAction,
1029 refund_grant_to: Option<Account>,
1030 grant: Option<&mut Amount>,
1031 ) -> Result<(), ExecutionError> {
1032 let chain_id = self.state.context().extra().chain_id();
1033 let mut cloned_grant = grant.as_ref().map(|x| **x);
1034 let initial_balance = self
1035 .resource_controller
1036 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
1037 .await?
1038 .balance()?;
1039 let mut controller = ResourceController::new(
1040 self.resource_controller.policy().clone(),
1041 self.resource_controller.tracker,
1042 initial_balance,
1043 );
1044 let is_free = matches!(
1045 &action,
1046 UserAction::Message(..)
1047 | UserAction::ProcessStreams(..)
1048 | UserAction::SummarizeEvents(..)
1049 ) && self
1050 .resource_controller
1051 .policy()
1052 .is_free_app(&application_id);
1053 controller.is_free = is_free;
1054 self.resource_controller.is_free = is_free;
1055 let (execution_state_sender, mut execution_state_receiver) =
1056 futures::channel::mpsc::unbounded();
1057
1058 let (codes, descriptions): (Vec<_>, Vec<_>) =
1059 self.contract_and_dependencies(application_id).await?;
1060
1061 let allow_application_logs = self
1062 .state
1063 .context()
1064 .extra()
1065 .execution_runtime_config()
1066 .allow_application_logs;
1067
1068 let contract_runtime_task = self
1069 .state
1070 .context()
1071 .extra()
1072 .thread_pool()
1073 .run_send(JsVec(codes), move |codes| async move {
1074 let runtime = ContractSyncRuntime::new(
1075 execution_state_sender,
1076 chain_id,
1077 refund_grant_to,
1078 controller,
1079 &action,
1080 allow_application_logs,
1081 );
1082
1083 for (code, description) in codes.0.into_iter().zip(descriptions) {
1084 runtime.preload_contract(ApplicationId::from(&description), code, description);
1085 }
1086
1087 runtime.run_action(application_id, chain_id, action)
1088 })
1089 .await;
1090
1091 async {
1092 while let Some(request) = execution_state_receiver.next().await {
1093 self.handle_request(request).await?;
1094 }
1095 Ok::<(), ExecutionError>(())
1096 }
1097 .instrument(info_span!("handle_runtime_requests"))
1098 .await?;
1099
1100 let (result, controller) = contract_runtime_task.await??;
1101
1102 self.resource_controller.is_free = false;
1103
1104 self.txn_tracker.add_operation_result(result);
1105
1106 self.resource_controller
1107 .with_state_and_grant(&mut self.state.system, grant)
1108 .await?
1109 .merge_balance(initial_balance, controller.balance()?)?;
1110 self.resource_controller.tracker = controller.tracker;
1111
1112 Ok(())
1113 }
1114
1115 #[instrument(skip_all, fields(
1116 chain_id = %context.chain_id,
1117 block_height = %context.height,
1118 operation_type = %operation.as_ref(),
1119 ))]
1120 pub async fn execute_operation(
1122 &mut self,
1123 context: OperationContext,
1124 operation: Operation,
1125 ) -> Result<(), ExecutionError> {
1126 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1127 match operation {
1128 Operation::System(op) => match *op {
1129 SystemOperation::Checkpoint => {
1130 let prepared = self.txn_tracker.take_prepared_checkpoint().ok_or(
1131 ExecutionError::CheckpointPreconditionFailed(
1132 "Checkpoint operation reached the actor without prepared inputs; \
1133 the chain-level pre-block hook must run prepare_checkpoint first",
1134 ),
1135 )?;
1136 self.state
1137 .apply_checkpoint(prepared, self.txn_tracker)
1138 .await?;
1139 self.summarize_events_at_checkpoint(context).await?;
1140 }
1141 op => {
1142 let new_application = self
1143 .state
1144 .system
1145 .execute_operation(context, op, self.txn_tracker, self.resource_controller)
1146 .await?;
1147 if let Some((application_id, argument)) = new_application {
1148 let user_action = UserAction::Instantiate(context, argument);
1149 self.run_user_action(
1150 application_id,
1151 user_action,
1152 context.refund_grant_to(),
1153 None,
1154 )
1155 .await?;
1156 }
1157 }
1158 },
1159 Operation::User {
1160 application_id,
1161 bytes,
1162 } => {
1163 self.run_user_action(
1164 application_id,
1165 UserAction::Operation(context, bytes),
1166 context.refund_grant_to(),
1167 None,
1168 )
1169 .await?;
1170 }
1171 }
1172 self.process_subscriptions(context.into()).await?;
1173 Ok(())
1174 }
1175
1176 #[instrument(skip_all, fields(
1177 chain_id = %context.chain_id,
1178 block_height = %context.height,
1179 origin = %context.origin,
1180 is_bouncing = %context.is_bouncing,
1181 message_type = %message.as_ref(),
1182 ))]
1183 pub async fn execute_message(
1185 &mut self,
1186 context: MessageContext,
1187 message: Message,
1188 grant: Option<&mut Amount>,
1189 ) -> Result<(), ExecutionError> {
1190 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1191 match message {
1192 Message::System(message) => {
1193 let outcome = self.state.system.execute_message(context, message).await?;
1194 self.txn_tracker.add_outgoing_messages(outcome);
1195 }
1196 Message::User {
1197 application_id,
1198 bytes,
1199 } => {
1200 self.run_user_action(
1201 application_id,
1202 UserAction::Message(context, bytes),
1203 context.refund_grant_to,
1204 grant,
1205 )
1206 .await?;
1207 }
1208 }
1209 self.process_subscriptions(context.into()).await?;
1210 Ok(())
1211 }
1212
1213 pub fn bounce_message(
1215 &mut self,
1216 context: MessageContext,
1217 grant: Amount,
1218 message: Message,
1219 ) -> Result<(), ExecutionError> {
1220 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1221 self.txn_tracker.add_outgoing_message(OutgoingMessage {
1222 destination: context.origin,
1223 authenticated_owner: context.authenticated_owner,
1224 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1225 grant,
1226 kind: MessageKind::Bouncing,
1227 message,
1228 });
1229 Ok(())
1230 }
1231
1232 pub fn send_refund(
1234 &mut self,
1235 context: MessageContext,
1236 amount: Amount,
1237 ) -> Result<(), ExecutionError> {
1238 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1239 if amount.is_zero() {
1240 return Ok(());
1241 }
1242 let Some(account) = context.refund_grant_to else {
1243 return Err(ExecutionError::InternalError(
1244 "Messages with grants should have a non-empty `refund_grant_to`",
1245 ));
1246 };
1247 let message = SystemMessage::Credit {
1248 amount,
1249 source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
1250 target: account.owner,
1251 };
1252 self.txn_tracker.add_outgoing_message(
1253 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1254 );
1255 Ok(())
1256 }
1257
1258 async fn receive_http_response(
1262 response: reqwest::Response,
1263 size_limit: u64,
1264 ) -> Result<http::Response, ExecutionError> {
1265 let status = response.status().as_u16();
1266 let maybe_content_length = response.content_length();
1267
1268 let headers = response
1269 .headers()
1270 .iter()
1271 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1272 .collect::<Vec<_>>();
1273
1274 let total_header_size = headers
1275 .iter()
1276 .map(|header| (header.name.len() + header.value.len()) as u64)
1277 .sum();
1278
1279 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1280 ExecutionError::HttpResponseSizeLimitExceeded {
1281 limit: size_limit,
1282 size: total_header_size,
1283 },
1284 )?;
1285
1286 if let Some(content_length) = maybe_content_length {
1287 if content_length > remaining_bytes {
1288 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1289 limit: size_limit,
1290 size: content_length + total_header_size,
1291 });
1292 }
1293 }
1294
1295 let mut body = Vec::with_capacity(
1296 usize::try_from(maybe_content_length.unwrap_or(0)).unwrap_or(usize::MAX),
1297 );
1298 let mut body_stream = response.bytes_stream();
1299
1300 while let Some(bytes) = body_stream.next().await.transpose()? {
1301 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1302 ExecutionError::HttpResponseSizeLimitExceeded {
1303 limit: size_limit,
1304 size: bytes.len() as u64 + (size_limit - remaining_bytes),
1305 },
1306 )?;
1307
1308 body.extend(&bytes);
1309 }
1310
1311 Ok(http::Response {
1312 status,
1313 headers,
1314 body,
1315 })
1316 }
1317}
1318
1319#[derive(Debug, strum::AsRefStr)]
1321#[allow(missing_docs)]
1322pub enum ExecutionRequest {
1323 #[cfg(not(web))]
1324 LoadContract {
1325 id: ApplicationId,
1326 #[debug(skip)]
1327 callback: Sender<(UserContractCode, ApplicationDescription)>,
1328 },
1329
1330 #[cfg(not(web))]
1331 LoadService {
1332 id: ApplicationId,
1333 #[debug(skip)]
1334 callback: Sender<(UserServiceCode, ApplicationDescription)>,
1335 },
1336
1337 ChainBalance {
1338 #[debug(skip)]
1339 callback: Sender<Amount>,
1340 },
1341
1342 OwnerBalance {
1343 owner: AccountOwner,
1344 #[debug(skip)]
1345 callback: Sender<Amount>,
1346 },
1347
1348 OwnerBalances {
1349 #[debug(skip)]
1350 callback: Sender<Vec<(AccountOwner, Amount)>>,
1351 },
1352
1353 BalanceOwners {
1354 #[debug(skip)]
1355 callback: Sender<Vec<AccountOwner>>,
1356 },
1357
1358 Allowance {
1359 owner: AccountOwner,
1360 spender: AccountOwner,
1361 #[debug(skip)]
1362 callback: Sender<Amount>,
1363 },
1364
1365 Allowances {
1366 #[debug(skip)]
1367 callback: Sender<Vec<(AccountOwner, AccountOwner, Amount)>>,
1368 },
1369
1370 Transfer {
1371 source: AccountOwner,
1372 destination: Account,
1373 amount: Amount,
1374 #[debug(skip_if = Option::is_none)]
1375 signer: Option<AccountOwner>,
1376 application_id: ApplicationId,
1377 #[debug(skip)]
1378 callback: Sender<()>,
1379 },
1380
1381 Claim {
1382 source: Account,
1383 destination: Account,
1384 amount: Amount,
1385 #[debug(skip_if = Option::is_none)]
1386 signer: Option<AccountOwner>,
1387 application_id: ApplicationId,
1388 #[debug(skip)]
1389 callback: Sender<()>,
1390 },
1391
1392 Approve {
1393 owner: AccountOwner,
1394 spender: AccountOwner,
1395 amount: Amount,
1396 #[debug(skip_if = Option::is_none)]
1397 signer: Option<AccountOwner>,
1398 application_id: ApplicationId,
1399 #[debug(skip)]
1400 callback: Sender<()>,
1401 },
1402
1403 TransferFrom {
1404 owner: AccountOwner,
1405 spender: AccountOwner,
1406 destination: Account,
1407 amount: Amount,
1408 #[debug(skip_if = Option::is_none)]
1409 signer: Option<AccountOwner>,
1410 application_id: ApplicationId,
1411 #[debug(skip)]
1412 callback: Sender<()>,
1413 },
1414
1415 SystemTimestamp {
1416 #[debug(skip)]
1417 callback: Sender<Timestamp>,
1418 },
1419
1420 ChainOwnership {
1421 #[debug(skip)]
1422 callback: Sender<ChainOwnership>,
1423 },
1424
1425 ApplicationPermissions {
1426 #[debug(skip)]
1427 callback: Sender<ApplicationPermissions>,
1428 },
1429
1430 ReadApplicationDescription {
1431 application_id: ApplicationId,
1432 #[debug(skip)]
1433 callback: Sender<ApplicationDescription>,
1434 },
1435
1436 ReadValueBytes {
1437 id: ApplicationId,
1438 #[debug(with = hex_debug)]
1439 key: Vec<u8>,
1440 #[debug(skip)]
1441 callback: Sender<Option<Vec<u8>>>,
1442 },
1443
1444 ContainsKey {
1445 id: ApplicationId,
1446 key: Vec<u8>,
1447 #[debug(skip)]
1448 callback: Sender<bool>,
1449 },
1450
1451 ContainsKeys {
1452 id: ApplicationId,
1453 #[debug(with = hex_vec_debug)]
1454 keys: Vec<Vec<u8>>,
1455 callback: Sender<Vec<bool>>,
1456 },
1457
1458 ReadMultiValuesBytes {
1459 id: ApplicationId,
1460 #[debug(with = hex_vec_debug)]
1461 keys: Vec<Vec<u8>>,
1462 #[debug(skip)]
1463 callback: Sender<Vec<Option<Vec<u8>>>>,
1464 },
1465
1466 FindKeysByPrefix {
1467 id: ApplicationId,
1468 #[debug(with = hex_debug)]
1469 key_prefix: Vec<u8>,
1470 #[debug(skip)]
1471 callback: Sender<Vec<Vec<u8>>>,
1472 },
1473
1474 FindKeyValuesByPrefix {
1475 id: ApplicationId,
1476 #[debug(with = hex_debug)]
1477 key_prefix: Vec<u8>,
1478 #[debug(skip)]
1479 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1480 },
1481
1482 WriteBatch {
1483 id: ApplicationId,
1484 batch: Batch,
1485 #[debug(skip)]
1486 callback: Sender<()>,
1487 },
1488
1489 OpenChain {
1490 ownership: ChainOwnership,
1491 #[debug(skip_if = Amount::is_zero)]
1492 balance: Amount,
1493 parent_id: ChainId,
1494 block_height: BlockHeight,
1495 application_permissions: ApplicationPermissions,
1496 timestamp: Timestamp,
1497 #[debug(skip)]
1498 callback: Sender<ChainId>,
1499 },
1500
1501 CloseChain {
1502 application_id: ApplicationId,
1503 #[debug(skip)]
1504 callback: Sender<Result<(), ExecutionError>>,
1505 },
1506
1507 ChangeOwnership {
1508 application_id: ApplicationId,
1509 ownership: ChainOwnership,
1510 #[debug(skip)]
1511 callback: Sender<Result<(), ExecutionError>>,
1512 },
1513
1514 ChangeApplicationPermissions {
1515 application_id: ApplicationId,
1516 application_permissions: ApplicationPermissions,
1517 #[debug(skip)]
1518 callback: Sender<Result<(), ExecutionError>>,
1519 },
1520
1521 PeekApplicationIndex {
1522 #[debug(skip)]
1523 callback: Sender<u32>,
1524 },
1525
1526 CreateApplication {
1527 chain_id: ChainId,
1528 block_height: BlockHeight,
1529 module_id: ModuleId,
1530 parameters: Vec<u8>,
1531 required_application_ids: Vec<ApplicationId>,
1532 #[debug(skip)]
1533 callback: Sender<CreateApplicationResult>,
1534 },
1535
1536 PerformHttpRequest {
1537 request: http::Request,
1538 http_responses_are_oracle_responses: bool,
1539 #[debug(skip)]
1540 callback: Sender<http::Response>,
1541 },
1542
1543 ReadBlobContent {
1544 blob_id: BlobId,
1545 #[debug(skip)]
1546 callback: Sender<BlobContent>,
1547 },
1548
1549 AssertBlobExists {
1550 blob_id: BlobId,
1551 #[debug(skip)]
1552 callback: Sender<()>,
1553 },
1554
1555 Emit {
1556 stream_id: StreamId,
1557 #[debug(with = hex_debug)]
1558 value: Vec<u8>,
1559 #[debug(skip)]
1560 callback: Sender<u32>,
1561 },
1562
1563 ReadEvent {
1564 event_id: EventId,
1565 callback: oneshot::Sender<Vec<u8>>,
1566 },
1567
1568 SubscribeToEvents {
1569 chain_id: ChainId,
1570 stream_id: StreamId,
1571 subscriber_app_id: ApplicationId,
1572 #[debug(skip)]
1573 callback: Sender<()>,
1574 },
1575
1576 UnsubscribeFromEvents {
1577 chain_id: ChainId,
1578 stream_id: StreamId,
1579 subscriber_app_id: ApplicationId,
1580 #[debug(skip)]
1581 callback: Sender<()>,
1582 },
1583
1584 GetApplicationPermissions {
1585 #[debug(skip)]
1586 callback: Sender<ApplicationPermissions>,
1587 },
1588
1589 QueryServiceOracle {
1590 deadline: Option<Instant>,
1591 application_id: ApplicationId,
1592 next_block_height: BlockHeight,
1593 query: Vec<u8>,
1594 #[debug(skip)]
1595 callback: Sender<Vec<u8>>,
1596 },
1597
1598 AddOutgoingMessage {
1599 message: crate::OutgoingMessage,
1600 #[debug(skip)]
1601 callback: Sender<()>,
1602 },
1603
1604 SetLocalTime {
1605 local_time: Timestamp,
1606 #[debug(skip)]
1607 callback: Sender<()>,
1608 },
1609
1610 AssertBefore {
1611 timestamp: Timestamp,
1612 #[debug(skip)]
1613 callback: Sender<Result<(), ExecutionError>>,
1614 },
1615
1616 AddCreatedBlob {
1617 blob: crate::Blob,
1618 #[debug(skip)]
1619 callback: Sender<()>,
1620 },
1621
1622 ValidationRound {
1623 round: Option<u32>,
1624 #[debug(skip)]
1625 callback: Sender<Option<u32>>,
1626 },
1627
1628 HasEmptyStorage {
1629 application: ApplicationId,
1630 #[debug(skip)]
1631 callback: Sender<bool>,
1632 },
1633
1634 AllowApplicationLogs {
1635 #[debug(skip)]
1636 callback: Sender<bool>,
1637 },
1638
1639 #[cfg(web)]
1641 Log {
1642 message: String,
1643 level: tracing::log::Level,
1644 },
1645}