1use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15 Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19 ownership::ChainOwnership,
20 time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27 execution::UserAction,
28 runtime::ContractSyncRuntime,
29 system::{CreateApplicationResult, OpenChainConfig},
30 util::{OracleResponseExt as _, RespondExt as _},
31 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeConfig,
32 ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, MessageKind, ModuleId,
33 Operation, OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext,
34 QueryOutcome, ResourceController, SystemMessage, TransactionTracker, UserContractCode,
35 UserServiceCode,
36};
37
38pub struct ExecutionStateActor<'a, C> {
40 state: &'a mut ExecutionStateView<C>,
41 txn_tracker: &'a mut TransactionTracker,
42 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
43}
44
45#[cfg(with_metrics)]
46mod metrics {
47 use std::sync::LazyLock;
48
49 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
50 use prometheus::HistogramVec;
51
52 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54 register_histogram_vec(
55 "load_contract_latency",
56 "Load contract latency",
57 &[],
58 exponential_bucket_latencies(250.0),
59 )
60 });
61
62 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
64 register_histogram_vec(
65 "load_service_latency",
66 "Load service latency",
67 &[],
68 exponential_bucket_latencies(250.0),
69 )
70 });
71}
72
73pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
74
75impl<'a, C> ExecutionStateActor<'a, C>
76where
77 C: Context + Clone + Send + Sync + 'static,
78 C::Extra: ExecutionRuntimeContext,
79{
80 pub fn new(
82 state: &'a mut ExecutionStateView<C>,
83 txn_tracker: &'a mut TransactionTracker,
84 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
85 ) -> Self {
86 Self {
87 state,
88 txn_tracker,
89 resource_controller,
90 }
91 }
92
93 pub(crate) async fn load_contract(
94 &mut self,
95 id: ApplicationId,
96 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
97 #[cfg(with_metrics)]
98 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
99 let blob_id = id.description_blob_id();
100 let description = match self.txn_tracker.get_blob_content(&blob_id) {
101 Some(blob) => bcs::from_bytes(blob.bytes())?,
102 None => {
103 self.state
104 .system
105 .describe_application(id, self.txn_tracker)
106 .await?
107 }
108 };
109 let code = self
110 .state
111 .context()
112 .extra()
113 .get_user_contract(&description, self.txn_tracker)
114 .await?;
115 Ok((code, description))
116 }
117
118 pub(crate) async fn load_service(
119 &mut self,
120 id: ApplicationId,
121 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
122 #[cfg(with_metrics)]
123 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
124 let blob_id = id.description_blob_id();
125 let description = match self.txn_tracker.get_blob_content(&blob_id) {
126 Some(blob) => bcs::from_bytes(blob.bytes())?,
127 None => {
128 self.state
129 .system
130 .describe_application(id, self.txn_tracker)
131 .await?
132 }
133 };
134 let code = self
135 .state
136 .context()
137 .extra()
138 .get_user_service(&description, self.txn_tracker)
139 .await?;
140 Ok((code, description))
141 }
142
143 pub(crate) async fn handle_request(
145 &mut self,
146 request: ExecutionRequest,
147 ) -> Result<(), ExecutionError> {
148 use ExecutionRequest::*;
149 match request {
150 #[cfg(not(web))]
151 LoadContract { id, callback } => {
152 let (code, description) = self.load_contract(id).await?;
153 callback.respond((code, description))
154 }
155 #[cfg(not(web))]
156 LoadService { id, callback } => {
157 let (code, description) = self.load_service(id).await?;
158 callback.respond((code, description))
159 }
160
161 ChainBalance { callback } => {
162 let balance = *self.state.system.balance.get();
163 callback.respond(balance);
164 }
165
166 OwnerBalance { owner, callback } => {
167 let balance = self
168 .state
169 .system
170 .balances
171 .get(&owner)
172 .await?
173 .unwrap_or_default();
174 callback.respond(balance);
175 }
176
177 OwnerBalances { callback } => {
178 callback.respond(self.state.system.balances.index_values().await?);
179 }
180
181 BalanceOwners { callback } => {
182 let owners = self.state.system.balances.indices().await?;
183 callback.respond(owners);
184 }
185
186 Transfer {
187 source,
188 destination,
189 amount,
190 signer,
191 application_id,
192 callback,
193 } => {
194 let maybe_message = self
195 .state
196 .system
197 .transfer(signer, Some(application_id), source, destination, amount)
198 .await?;
199 self.txn_tracker.add_outgoing_messages(maybe_message);
200 callback.respond(());
201 }
202
203 Claim {
204 source,
205 destination,
206 amount,
207 signer,
208 application_id,
209 callback,
210 } => {
211 let maybe_message = self
212 .state
213 .system
214 .claim(
215 signer,
216 Some(application_id),
217 source.owner,
218 source.chain_id,
219 destination,
220 amount,
221 )
222 .await?;
223 self.txn_tracker.add_outgoing_messages(maybe_message);
224 callback.respond(());
225 }
226
227 SystemTimestamp { callback } => {
228 let timestamp = *self.state.system.timestamp.get();
229 callback.respond(timestamp);
230 }
231
232 ChainOwnership { callback } => {
233 let ownership = self.state.system.ownership.get().clone();
234 callback.respond(ownership);
235 }
236
237 ContainsKey { id, key, callback } => {
238 let view = self.state.users.try_load_entry(&id).await?;
239 let result = match view {
240 Some(view) => view.contains_key(&key).await?,
241 None => false,
242 };
243 callback.respond(result);
244 }
245
246 ContainsKeys { id, keys, callback } => {
247 let view = self.state.users.try_load_entry(&id).await?;
248 let result = match view {
249 Some(view) => view.contains_keys(keys).await?,
250 None => vec![false; keys.len()],
251 };
252 callback.respond(result);
253 }
254
255 ReadMultiValuesBytes { id, keys, callback } => {
256 let view = self.state.users.try_load_entry(&id).await?;
257 let values = match view {
258 Some(view) => view.multi_get(keys).await?,
259 None => vec![None; keys.len()],
260 };
261 callback.respond(values);
262 }
263
264 ReadValueBytes { id, key, callback } => {
265 let view = self.state.users.try_load_entry(&id).await?;
266 let result = match view {
267 Some(view) => view.get(&key).await?,
268 None => None,
269 };
270 callback.respond(result);
271 }
272
273 FindKeysByPrefix {
274 id,
275 key_prefix,
276 callback,
277 } => {
278 let view = self.state.users.try_load_entry(&id).await?;
279 let result = match view {
280 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
281 None => Vec::new(),
282 };
283 callback.respond(result);
284 }
285
286 FindKeyValuesByPrefix {
287 id,
288 key_prefix,
289 callback,
290 } => {
291 let view = self.state.users.try_load_entry(&id).await?;
292 let result = match view {
293 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
294 None => Vec::new(),
295 };
296 callback.respond(result);
297 }
298
299 WriteBatch {
300 id,
301 batch,
302 callback,
303 } => {
304 let mut view = self.state.users.try_load_entry_mut(&id).await?;
305 view.write_batch(batch).await?;
306 callback.respond(());
307 }
308
309 OpenChain {
310 ownership,
311 balance,
312 parent_id,
313 block_height,
314 application_permissions,
315 timestamp,
316 callback,
317 } => {
318 let config = OpenChainConfig {
319 ownership,
320 balance,
321 application_permissions,
322 };
323 let chain_id = self
324 .state
325 .system
326 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
327 .await?;
328 callback.respond(chain_id);
329 }
330
331 CloseChain {
332 application_id,
333 callback,
334 } => {
335 let app_permissions = self.state.system.application_permissions.get();
336 if !app_permissions.can_close_chain(&application_id) {
337 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
338 } else {
339 self.state.system.close_chain();
340 callback.respond(Ok(()));
341 }
342 }
343
344 ChangeApplicationPermissions {
345 application_id,
346 application_permissions,
347 callback,
348 } => {
349 let app_permissions = self.state.system.application_permissions.get();
350 if !app_permissions.can_change_application_permissions(&application_id) {
351 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
352 } else {
353 self.state
354 .system
355 .application_permissions
356 .set(application_permissions);
357 callback.respond(Ok(()));
358 }
359 }
360
361 CreateApplication {
362 chain_id,
363 block_height,
364 module_id,
365 parameters,
366 required_application_ids,
367 callback,
368 } => {
369 let create_application_result = self
370 .state
371 .system
372 .create_application(
373 chain_id,
374 block_height,
375 module_id,
376 parameters,
377 required_application_ids,
378 self.txn_tracker,
379 )
380 .await?;
381 callback.respond(create_application_result);
382 }
383
384 PerformHttpRequest {
385 request,
386 http_responses_are_oracle_responses,
387 callback,
388 } => {
389 let system = &mut self.state.system;
390 let response = self
391 .txn_tracker
392 .oracle(|| async {
393 let headers = request
394 .headers
395 .into_iter()
396 .map(|http::Header { name, value }| {
397 Ok((name.parse()?, value.try_into()?))
398 })
399 .collect::<Result<HeaderMap, ExecutionError>>()?;
400
401 let url = Url::parse(&request.url)?;
402 let host = url
403 .host_str()
404 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
405
406 let (_epoch, committee) = system
407 .current_committee()
408 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
409 let allowed_hosts = &committee.policy().http_request_allow_list;
410
411 ensure!(
412 allowed_hosts.contains(host),
413 ExecutionError::UnauthorizedHttpRequest(url)
414 );
415
416 let request = Client::new()
417 .request(request.method.into(), url)
418 .body(request.body)
419 .headers(headers);
420 #[cfg(not(web))]
421 let request = request.timeout(linera_base::time::Duration::from_millis(
422 committee.policy().http_request_timeout_ms,
423 ));
424
425 let response = request.send().await?;
426
427 let mut response_size_limit =
428 committee.policy().maximum_http_response_bytes;
429
430 if http_responses_are_oracle_responses {
431 response_size_limit = response_size_limit
432 .min(committee.policy().maximum_oracle_response_bytes);
433 }
434 Ok(OracleResponse::Http(
435 Self::receive_http_response(response, response_size_limit).await?,
436 ))
437 })
438 .await?
439 .to_http_response()?;
440 callback.respond(response);
441 }
442
443 ReadBlobContent { blob_id, callback } => {
444 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
445 content.clone()
446 } else {
447 let content = self.state.system.read_blob_content(blob_id).await?;
448 if blob_id.blob_type == BlobType::Data {
449 self.resource_controller
450 .with_state(&mut self.state.system)
451 .await?
452 .track_blob_read(content.bytes().len() as u64)?;
453 }
454 self.state
455 .system
456 .blob_used(self.txn_tracker, blob_id)
457 .await?;
458 content
459 };
460 callback.respond(content)
461 }
462
463 AssertBlobExists { blob_id, callback } => {
464 self.state.system.assert_blob_exists(blob_id).await?;
465 if blob_id.blob_type == BlobType::Data {
467 self.resource_controller
468 .with_state(&mut self.state.system)
469 .await?
470 .track_blob_read(0)?;
471 }
472 let is_new = self
473 .state
474 .system
475 .blob_used(self.txn_tracker, blob_id)
476 .await?;
477 if is_new {
478 self.txn_tracker
479 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
480 }
481 callback.respond(());
482 }
483
484 Emit {
485 stream_id,
486 value,
487 callback,
488 } => {
489 let count = self
490 .state
491 .system
492 .stream_event_counts
493 .get_mut_or_default(&stream_id)
494 .await?;
495 let index = *count;
496 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
497 self.resource_controller
498 .with_state(&mut self.state.system)
499 .await?
500 .track_event_published(&value)?;
501 self.txn_tracker.add_event(stream_id, index, value);
502 callback.respond(index)
503 }
504
505 ReadEvent { event_id, callback } => {
506 let extra = self.state.context().extra();
507 let event = self
508 .txn_tracker
509 .oracle(|| async {
510 let event = extra
511 .get_event(event_id.clone())
512 .await?
513 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
514 Ok(OracleResponse::Event(event_id.clone(), event))
515 })
516 .await?
517 .to_event(&event_id)?;
518 self.resource_controller
519 .with_state(&mut self.state.system)
520 .await?
521 .track_event_read(event.len() as u64)?;
522 callback.respond(event);
523 }
524
525 SubscribeToEvents {
526 chain_id,
527 stream_id,
528 subscriber_app_id,
529 callback,
530 } => {
531 let subscriptions = self
532 .state
533 .system
534 .event_subscriptions
535 .get_mut_or_default(&(chain_id, stream_id.clone()))
536 .await?;
537 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
538 subscriptions.next_index
539 } else {
540 0
541 };
542 self.txn_tracker.add_stream_to_process(
543 subscriber_app_id,
544 chain_id,
545 stream_id,
546 0,
547 next_index,
548 );
549 callback.respond(());
550 }
551
552 UnsubscribeFromEvents {
553 chain_id,
554 stream_id,
555 subscriber_app_id,
556 callback,
557 } => {
558 let key = (chain_id, stream_id.clone());
559 let subscriptions = self
560 .state
561 .system
562 .event_subscriptions
563 .get_mut_or_default(&key)
564 .await?;
565 subscriptions.applications.remove(&subscriber_app_id);
566 if subscriptions.applications.is_empty() {
567 self.state.system.event_subscriptions.remove(&key)?;
568 }
569 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
570 self.txn_tracker
571 .remove_stream_to_process(app_id, chain_id, stream_id);
572 }
573 callback.respond(());
574 }
575
576 GetApplicationPermissions { callback } => {
577 let app_permissions = self.state.system.application_permissions.get();
578 callback.respond(app_permissions.clone());
579 }
580
581 QueryServiceOracle {
582 deadline,
583 application_id,
584 next_block_height,
585 query,
586 callback,
587 } => {
588 let state = &mut self.state;
589 let local_time = self.txn_tracker.local_time();
590 let created_blobs = self.txn_tracker.created_blobs().clone();
591 let bytes = self
592 .txn_tracker
593 .oracle(|| async {
594 let context = QueryContext {
595 chain_id: state.context().extra().chain_id(),
596 next_block_height,
597 local_time,
598 };
599 let QueryOutcome {
600 response,
601 operations,
602 } = Box::pin(state.query_user_application_with_deadline(
603 application_id,
604 context,
605 query,
606 deadline,
607 created_blobs,
608 ))
609 .await?;
610 ensure!(
611 operations.is_empty(),
612 ExecutionError::ServiceOracleQueryOperations(operations)
613 );
614 Ok(OracleResponse::Service(response))
615 })
616 .await?
617 .to_service_response()?;
618 callback.respond(bytes);
619 }
620
621 AddOutgoingMessage { message, callback } => {
622 self.txn_tracker.add_outgoing_message(message);
623 callback.respond(());
624 }
625
626 SetLocalTime {
627 local_time,
628 callback,
629 } => {
630 self.txn_tracker.set_local_time(local_time);
631 callback.respond(());
632 }
633
634 AssertBefore {
635 timestamp,
636 callback,
637 } => {
638 let result = if !self
639 .txn_tracker
640 .replay_oracle_response(OracleResponse::Assert)?
641 {
642 let local_time = self.txn_tracker.local_time();
644 if local_time >= timestamp {
645 Err(ExecutionError::AssertBefore {
646 timestamp,
647 local_time,
648 })
649 } else {
650 Ok(())
651 }
652 } else {
653 Ok(())
654 };
655 callback.respond(result);
656 }
657
658 AddCreatedBlob { blob, callback } => {
659 self.txn_tracker.add_created_blob(blob);
660 callback.respond(());
661 }
662
663 ValidationRound { round, callback } => {
664 let validation_round = self
665 .txn_tracker
666 .oracle(|| async { Ok(OracleResponse::Round(round)) })
667 .await?
668 .to_round()?;
669 callback.respond(validation_round);
670 }
671 }
672
673 Ok(())
674 }
675
676 async fn process_subscriptions(
679 &mut self,
680 context: ProcessStreamsContext,
681 ) -> Result<(), ExecutionError> {
682 let mut processed = BTreeSet::new();
685 loop {
686 let to_process = self
687 .txn_tracker
688 .take_streams_to_process()
689 .into_iter()
690 .filter_map(|(app_id, updates)| {
691 let updates = updates
692 .into_iter()
693 .filter_map(|update| {
694 if !processed.insert((
695 app_id,
696 update.chain_id,
697 update.stream_id.clone(),
698 )) {
699 return None;
700 }
701 Some(update)
702 })
703 .collect::<Vec<_>>();
704 if updates.is_empty() {
705 return None;
706 }
707 Some((app_id, updates))
708 })
709 .collect::<BTreeMap<_, _>>();
710 if to_process.is_empty() {
711 return Ok(());
712 }
713 for (app_id, updates) in to_process {
714 self.run_user_action(
715 app_id,
716 UserAction::ProcessStreams(context, updates),
717 None,
718 None,
719 )
720 .await?;
721 }
722 }
723 }
724
725 pub(crate) async fn run_user_action(
726 &mut self,
727 application_id: ApplicationId,
728 action: UserAction,
729 refund_grant_to: Option<Account>,
730 grant: Option<&mut Amount>,
731 ) -> Result<(), ExecutionError> {
732 let ExecutionRuntimeConfig {} = self.state.context().extra().execution_runtime_config();
733 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
734 .await
735 }
736
737 async fn run_user_action_with_runtime(
738 &mut self,
739 application_id: ApplicationId,
740 action: UserAction,
741 refund_grant_to: Option<Account>,
742 grant: Option<&mut Amount>,
743 ) -> Result<(), ExecutionError> {
744 let chain_id = self.state.context().extra().chain_id();
745 let mut cloned_grant = grant.as_ref().map(|x| **x);
746 let initial_balance = self
747 .resource_controller
748 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
749 .await?
750 .balance()?;
751 let controller = ResourceController::new(
752 self.resource_controller.policy().clone(),
753 self.resource_controller.tracker,
754 initial_balance,
755 );
756 let (execution_state_sender, mut execution_state_receiver) =
757 futures::channel::mpsc::unbounded();
758
759 let (code, description) = self.load_contract(application_id).await?;
760
761 let contract_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
762 let runtime = ContractSyncRuntime::new(
763 execution_state_sender,
764 chain_id,
765 refund_grant_to,
766 controller,
767 &action,
768 );
769
770 async move {
771 let code = codes.next().await.expect("we send this immediately below");
772 runtime.preload_contract(application_id, code, description)?;
773 runtime.run_action(application_id, chain_id, action)
774 }
775 })
776 .await;
777
778 contract_runtime_task.send(code)?;
779
780 while let Some(request) = execution_state_receiver.next().await {
781 self.handle_request(request).await?;
782 }
783
784 let (result, controller) = contract_runtime_task.join().await?;
785
786 self.txn_tracker.add_operation_result(result);
787
788 self.resource_controller
789 .with_state_and_grant(&mut self.state.system, grant)
790 .await?
791 .merge_balance(initial_balance, controller.balance()?)?;
792 self.resource_controller.tracker = controller.tracker;
793
794 Ok(())
795 }
796
797 pub async fn execute_operation(
798 &mut self,
799 context: OperationContext,
800 operation: Operation,
801 ) -> Result<(), ExecutionError> {
802 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
803 match operation {
804 Operation::System(op) => {
805 let new_application = self
806 .state
807 .system
808 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
809 .await?;
810 if let Some((application_id, argument)) = new_application {
811 let user_action = UserAction::Instantiate(context, argument);
812 self.run_user_action(
813 application_id,
814 user_action,
815 context.refund_grant_to(),
816 None,
817 )
818 .await?;
819 }
820 }
821 Operation::User {
822 application_id,
823 bytes,
824 } => {
825 self.run_user_action(
826 application_id,
827 UserAction::Operation(context, bytes),
828 context.refund_grant_to(),
829 None,
830 )
831 .await?;
832 }
833 }
834 self.process_subscriptions(context.into()).await?;
835 Ok(())
836 }
837
838 pub async fn execute_message(
839 &mut self,
840 context: MessageContext,
841 message: Message,
842 grant: Option<&mut Amount>,
843 ) -> Result<(), ExecutionError> {
844 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
845 match message {
846 Message::System(message) => {
847 let outcome = self.state.system.execute_message(context, message).await?;
848 self.txn_tracker.add_outgoing_messages(outcome);
849 }
850 Message::User {
851 application_id,
852 bytes,
853 } => {
854 self.run_user_action(
855 application_id,
856 UserAction::Message(context, bytes),
857 context.refund_grant_to,
858 grant,
859 )
860 .await?;
861 }
862 }
863 self.process_subscriptions(context.into()).await?;
864 Ok(())
865 }
866
867 pub fn bounce_message(
868 &mut self,
869 context: MessageContext,
870 grant: Amount,
871 message: Message,
872 ) -> Result<(), ExecutionError> {
873 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
874 self.txn_tracker.add_outgoing_message(OutgoingMessage {
875 destination: context.origin,
876 authenticated_owner: context.authenticated_owner,
877 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
878 grant,
879 kind: MessageKind::Bouncing,
880 message,
881 });
882 Ok(())
883 }
884
885 pub fn send_refund(
886 &mut self,
887 context: MessageContext,
888 amount: Amount,
889 ) -> Result<(), ExecutionError> {
890 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
891 if amount.is_zero() {
892 return Ok(());
893 }
894 let Some(account) = context.refund_grant_to else {
895 return Err(ExecutionError::InternalError(
896 "Messages with grants should have a non-empty `refund_grant_to`",
897 ));
898 };
899 let message = SystemMessage::Credit {
900 amount,
901 source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
902 target: account.owner,
903 };
904 self.txn_tracker.add_outgoing_message(
905 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
906 );
907 Ok(())
908 }
909
910 async fn receive_http_response(
914 response: reqwest::Response,
915 size_limit: u64,
916 ) -> Result<http::Response, ExecutionError> {
917 let status = response.status().as_u16();
918 let maybe_content_length = response.content_length();
919
920 let headers = response
921 .headers()
922 .iter()
923 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
924 .collect::<Vec<_>>();
925
926 let total_header_size = headers
927 .iter()
928 .map(|header| (header.name.len() + header.value.len()) as u64)
929 .sum();
930
931 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
932 ExecutionError::HttpResponseSizeLimitExceeded {
933 limit: size_limit,
934 size: total_header_size,
935 },
936 )?;
937
938 if let Some(content_length) = maybe_content_length {
939 if content_length > remaining_bytes {
940 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
941 limit: size_limit,
942 size: content_length + total_header_size,
943 });
944 }
945 }
946
947 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
948 let mut body_stream = response.bytes_stream();
949
950 while let Some(bytes) = body_stream.next().await.transpose()? {
951 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
952 ExecutionError::HttpResponseSizeLimitExceeded {
953 limit: size_limit,
954 size: bytes.len() as u64 + (size_limit - remaining_bytes),
955 },
956 )?;
957
958 body.extend(&bytes);
959 }
960
961 Ok(http::Response {
962 status,
963 headers,
964 body,
965 })
966 }
967}
968
969#[derive(Debug)]
971pub enum ExecutionRequest {
972 #[cfg(not(web))]
973 LoadContract {
974 id: ApplicationId,
975 #[debug(skip)]
976 callback: Sender<(UserContractCode, ApplicationDescription)>,
977 },
978
979 #[cfg(not(web))]
980 LoadService {
981 id: ApplicationId,
982 #[debug(skip)]
983 callback: Sender<(UserServiceCode, ApplicationDescription)>,
984 },
985
986 ChainBalance {
987 #[debug(skip)]
988 callback: Sender<Amount>,
989 },
990
991 OwnerBalance {
992 owner: AccountOwner,
993 #[debug(skip)]
994 callback: Sender<Amount>,
995 },
996
997 OwnerBalances {
998 #[debug(skip)]
999 callback: Sender<Vec<(AccountOwner, Amount)>>,
1000 },
1001
1002 BalanceOwners {
1003 #[debug(skip)]
1004 callback: Sender<Vec<AccountOwner>>,
1005 },
1006
1007 Transfer {
1008 source: AccountOwner,
1009 destination: Account,
1010 amount: Amount,
1011 #[debug(skip_if = Option::is_none)]
1012 signer: Option<AccountOwner>,
1013 application_id: ApplicationId,
1014 #[debug(skip)]
1015 callback: Sender<()>,
1016 },
1017
1018 Claim {
1019 source: Account,
1020 destination: Account,
1021 amount: Amount,
1022 #[debug(skip_if = Option::is_none)]
1023 signer: Option<AccountOwner>,
1024 application_id: ApplicationId,
1025 #[debug(skip)]
1026 callback: Sender<()>,
1027 },
1028
1029 SystemTimestamp {
1030 #[debug(skip)]
1031 callback: Sender<Timestamp>,
1032 },
1033
1034 ChainOwnership {
1035 #[debug(skip)]
1036 callback: Sender<ChainOwnership>,
1037 },
1038
1039 ReadValueBytes {
1040 id: ApplicationId,
1041 #[debug(with = hex_debug)]
1042 key: Vec<u8>,
1043 #[debug(skip)]
1044 callback: Sender<Option<Vec<u8>>>,
1045 },
1046
1047 ContainsKey {
1048 id: ApplicationId,
1049 key: Vec<u8>,
1050 #[debug(skip)]
1051 callback: Sender<bool>,
1052 },
1053
1054 ContainsKeys {
1055 id: ApplicationId,
1056 #[debug(with = hex_vec_debug)]
1057 keys: Vec<Vec<u8>>,
1058 callback: Sender<Vec<bool>>,
1059 },
1060
1061 ReadMultiValuesBytes {
1062 id: ApplicationId,
1063 #[debug(with = hex_vec_debug)]
1064 keys: Vec<Vec<u8>>,
1065 #[debug(skip)]
1066 callback: Sender<Vec<Option<Vec<u8>>>>,
1067 },
1068
1069 FindKeysByPrefix {
1070 id: ApplicationId,
1071 #[debug(with = hex_debug)]
1072 key_prefix: Vec<u8>,
1073 #[debug(skip)]
1074 callback: Sender<Vec<Vec<u8>>>,
1075 },
1076
1077 FindKeyValuesByPrefix {
1078 id: ApplicationId,
1079 #[debug(with = hex_debug)]
1080 key_prefix: Vec<u8>,
1081 #[debug(skip)]
1082 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1083 },
1084
1085 WriteBatch {
1086 id: ApplicationId,
1087 batch: Batch,
1088 #[debug(skip)]
1089 callback: Sender<()>,
1090 },
1091
1092 OpenChain {
1093 ownership: ChainOwnership,
1094 #[debug(skip_if = Amount::is_zero)]
1095 balance: Amount,
1096 parent_id: ChainId,
1097 block_height: BlockHeight,
1098 application_permissions: ApplicationPermissions,
1099 timestamp: Timestamp,
1100 #[debug(skip)]
1101 callback: Sender<ChainId>,
1102 },
1103
1104 CloseChain {
1105 application_id: ApplicationId,
1106 #[debug(skip)]
1107 callback: Sender<Result<(), ExecutionError>>,
1108 },
1109
1110 ChangeApplicationPermissions {
1111 application_id: ApplicationId,
1112 application_permissions: ApplicationPermissions,
1113 #[debug(skip)]
1114 callback: Sender<Result<(), ExecutionError>>,
1115 },
1116
1117 CreateApplication {
1118 chain_id: ChainId,
1119 block_height: BlockHeight,
1120 module_id: ModuleId,
1121 parameters: Vec<u8>,
1122 required_application_ids: Vec<ApplicationId>,
1123 #[debug(skip)]
1124 callback: Sender<CreateApplicationResult>,
1125 },
1126
1127 PerformHttpRequest {
1128 request: http::Request,
1129 http_responses_are_oracle_responses: bool,
1130 #[debug(skip)]
1131 callback: Sender<http::Response>,
1132 },
1133
1134 ReadBlobContent {
1135 blob_id: BlobId,
1136 #[debug(skip)]
1137 callback: Sender<BlobContent>,
1138 },
1139
1140 AssertBlobExists {
1141 blob_id: BlobId,
1142 #[debug(skip)]
1143 callback: Sender<()>,
1144 },
1145
1146 Emit {
1147 stream_id: StreamId,
1148 #[debug(with = hex_debug)]
1149 value: Vec<u8>,
1150 #[debug(skip)]
1151 callback: Sender<u32>,
1152 },
1153
1154 ReadEvent {
1155 event_id: EventId,
1156 callback: oneshot::Sender<Vec<u8>>,
1157 },
1158
1159 SubscribeToEvents {
1160 chain_id: ChainId,
1161 stream_id: StreamId,
1162 subscriber_app_id: ApplicationId,
1163 #[debug(skip)]
1164 callback: Sender<()>,
1165 },
1166
1167 UnsubscribeFromEvents {
1168 chain_id: ChainId,
1169 stream_id: StreamId,
1170 subscriber_app_id: ApplicationId,
1171 #[debug(skip)]
1172 callback: Sender<()>,
1173 },
1174
1175 GetApplicationPermissions {
1176 #[debug(skip)]
1177 callback: Sender<ApplicationPermissions>,
1178 },
1179
1180 QueryServiceOracle {
1181 deadline: Option<Instant>,
1182 application_id: ApplicationId,
1183 next_block_height: BlockHeight,
1184 query: Vec<u8>,
1185 #[debug(skip)]
1186 callback: Sender<Vec<u8>>,
1187 },
1188
1189 AddOutgoingMessage {
1190 message: crate::OutgoingMessage,
1191 #[debug(skip)]
1192 callback: Sender<()>,
1193 },
1194
1195 SetLocalTime {
1196 local_time: Timestamp,
1197 #[debug(skip)]
1198 callback: Sender<()>,
1199 },
1200
1201 AssertBefore {
1202 timestamp: Timestamp,
1203 #[debug(skip)]
1204 callback: Sender<Result<(), ExecutionError>>,
1205 },
1206
1207 AddCreatedBlob {
1208 blob: crate::Blob,
1209 #[debug(skip)]
1210 callback: Sender<()>,
1211 },
1212
1213 ValidationRound {
1214 round: Option<u32>,
1215 #[debug(skip)]
1216 callback: Sender<Option<u32>>,
1217 },
1218}