1use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15 Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19 ownership::ChainOwnership,
20 time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27 execution::UserAction,
28 runtime::ContractSyncRuntime,
29 system::{CreateApplicationResult, OpenChainConfig},
30 util::{OracleResponseExt as _, RespondExt as _},
31 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeConfig,
32 ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, MessageKind, ModuleId,
33 Operation, OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext,
34 QueryOutcome, ResourceController, SystemMessage, TransactionTracker, UserContractCode,
35 UserServiceCode,
36};
37
38pub struct ExecutionStateActor<'a, C> {
40 state: &'a mut ExecutionStateView<C>,
41 txn_tracker: &'a mut TransactionTracker,
42 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
43}
44
45#[cfg(with_metrics)]
46mod metrics {
47 use std::sync::LazyLock;
48
49 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
50 use prometheus::HistogramVec;
51
52 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54 register_histogram_vec(
55 "load_contract_latency",
56 "Load contract latency",
57 &[],
58 exponential_bucket_latencies(250.0),
59 )
60 });
61
62 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
64 register_histogram_vec(
65 "load_service_latency",
66 "Load service latency",
67 &[],
68 exponential_bucket_latencies(250.0),
69 )
70 });
71}
72
73pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
74
75impl<'a, C> ExecutionStateActor<'a, C>
76where
77 C: Context + Clone + Send + Sync + 'static,
78 C::Extra: ExecutionRuntimeContext,
79{
80 pub fn new(
82 state: &'a mut ExecutionStateView<C>,
83 txn_tracker: &'a mut TransactionTracker,
84 resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
85 ) -> Self {
86 Self {
87 state,
88 txn_tracker,
89 resource_controller,
90 }
91 }
92
93 pub(crate) async fn load_contract(
94 &mut self,
95 id: ApplicationId,
96 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
97 #[cfg(with_metrics)]
98 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
99 let blob_id = id.description_blob_id();
100 let description = match self.txn_tracker.get_blob_content(&blob_id) {
101 Some(blob) => bcs::from_bytes(blob.bytes())?,
102 None => {
103 self.state
104 .system
105 .describe_application(id, self.txn_tracker)
106 .await?
107 }
108 };
109 let code = self
110 .state
111 .context()
112 .extra()
113 .get_user_contract(&description, self.txn_tracker)
114 .await?;
115 Ok((code, description))
116 }
117
118 pub(crate) async fn load_service(
119 &mut self,
120 id: ApplicationId,
121 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
122 #[cfg(with_metrics)]
123 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
124 let blob_id = id.description_blob_id();
125 let description = match self.txn_tracker.get_blob_content(&blob_id) {
126 Some(blob) => bcs::from_bytes(blob.bytes())?,
127 None => {
128 self.state
129 .system
130 .describe_application(id, self.txn_tracker)
131 .await?
132 }
133 };
134 let code = self
135 .state
136 .context()
137 .extra()
138 .get_user_service(&description, self.txn_tracker)
139 .await?;
140 Ok((code, description))
141 }
142
143 pub(crate) async fn handle_request(
145 &mut self,
146 request: ExecutionRequest,
147 ) -> Result<(), ExecutionError> {
148 use ExecutionRequest::*;
149 match request {
150 #[cfg(not(web))]
151 LoadContract { id, callback } => {
152 let (code, description) = self.load_contract(id).await?;
153 callback.respond((code, description))
154 }
155 #[cfg(not(web))]
156 LoadService { id, callback } => {
157 let (code, description) = self.load_service(id).await?;
158 callback.respond((code, description))
159 }
160
161 ChainBalance { callback } => {
162 let balance = *self.state.system.balance.get();
163 callback.respond(balance);
164 }
165
166 OwnerBalance { owner, callback } => {
167 let balance = self
168 .state
169 .system
170 .balances
171 .get(&owner)
172 .await?
173 .unwrap_or_default();
174 callback.respond(balance);
175 }
176
177 OwnerBalances { callback } => {
178 callback.respond(self.state.system.balances.index_values().await?);
179 }
180
181 BalanceOwners { callback } => {
182 let owners = self.state.system.balances.indices().await?;
183 callback.respond(owners);
184 }
185
186 Transfer {
187 source,
188 destination,
189 amount,
190 signer,
191 application_id,
192 callback,
193 } => {
194 let maybe_message = self
195 .state
196 .system
197 .transfer(signer, Some(application_id), source, destination, amount)
198 .await?;
199 self.txn_tracker.add_outgoing_messages(maybe_message);
200 callback.respond(());
201 }
202
203 Claim {
204 source,
205 destination,
206 amount,
207 signer,
208 application_id,
209 callback,
210 } => {
211 let maybe_message = self
212 .state
213 .system
214 .claim(
215 signer,
216 Some(application_id),
217 source.owner,
218 source.chain_id,
219 destination,
220 amount,
221 )
222 .await?;
223 self.txn_tracker.add_outgoing_messages(maybe_message);
224 callback.respond(());
225 }
226
227 SystemTimestamp { callback } => {
228 let timestamp = *self.state.system.timestamp.get();
229 callback.respond(timestamp);
230 }
231
232 ChainOwnership { callback } => {
233 let ownership = self.state.system.ownership.get().clone();
234 callback.respond(ownership);
235 }
236
237 ContainsKey { id, key, callback } => {
238 let view = self.state.users.try_load_entry(&id).await?;
239 let result = match view {
240 Some(view) => view.contains_key(&key).await?,
241 None => false,
242 };
243 callback.respond(result);
244 }
245
246 ContainsKeys { id, keys, callback } => {
247 let view = self.state.users.try_load_entry(&id).await?;
248 let result = match view {
249 Some(view) => view.contains_keys(keys).await?,
250 None => vec![false; keys.len()],
251 };
252 callback.respond(result);
253 }
254
255 ReadMultiValuesBytes { id, keys, callback } => {
256 let view = self.state.users.try_load_entry(&id).await?;
257 let values = match view {
258 Some(view) => view.multi_get(keys).await?,
259 None => vec![None; keys.len()],
260 };
261 callback.respond(values);
262 }
263
264 ReadValueBytes { id, key, callback } => {
265 let view = self.state.users.try_load_entry(&id).await?;
266 let result = match view {
267 Some(view) => view.get(&key).await?,
268 None => None,
269 };
270 callback.respond(result);
271 }
272
273 FindKeysByPrefix {
274 id,
275 key_prefix,
276 callback,
277 } => {
278 let view = self.state.users.try_load_entry(&id).await?;
279 let result = match view {
280 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
281 None => Vec::new(),
282 };
283 callback.respond(result);
284 }
285
286 FindKeyValuesByPrefix {
287 id,
288 key_prefix,
289 callback,
290 } => {
291 let view = self.state.users.try_load_entry(&id).await?;
292 let result = match view {
293 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
294 None => Vec::new(),
295 };
296 callback.respond(result);
297 }
298
299 WriteBatch {
300 id,
301 batch,
302 callback,
303 } => {
304 let mut view = self.state.users.try_load_entry_mut(&id).await?;
305 view.write_batch(batch).await?;
306 callback.respond(());
307 }
308
309 OpenChain {
310 ownership,
311 balance,
312 parent_id,
313 block_height,
314 application_permissions,
315 timestamp,
316 callback,
317 } => {
318 let config = OpenChainConfig {
319 ownership,
320 balance,
321 application_permissions,
322 };
323 let chain_id = self
324 .state
325 .system
326 .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
327 .await?;
328 callback.respond(chain_id);
329 }
330
331 CloseChain {
332 application_id,
333 callback,
334 } => {
335 let app_permissions = self.state.system.application_permissions.get();
336 if !app_permissions.can_close_chain(&application_id) {
337 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
338 } else {
339 self.state.system.close_chain();
340 callback.respond(Ok(()));
341 }
342 }
343
344 ChangeApplicationPermissions {
345 application_id,
346 application_permissions,
347 callback,
348 } => {
349 let app_permissions = self.state.system.application_permissions.get();
350 if !app_permissions.can_change_application_permissions(&application_id) {
351 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
352 } else {
353 self.state
354 .system
355 .application_permissions
356 .set(application_permissions);
357 callback.respond(Ok(()));
358 }
359 }
360
361 CreateApplication {
362 chain_id,
363 block_height,
364 module_id,
365 parameters,
366 required_application_ids,
367 callback,
368 } => {
369 let create_application_result = self
370 .state
371 .system
372 .create_application(
373 chain_id,
374 block_height,
375 module_id,
376 parameters,
377 required_application_ids,
378 self.txn_tracker,
379 )
380 .await?;
381 callback.respond(create_application_result);
382 }
383
384 PerformHttpRequest {
385 request,
386 http_responses_are_oracle_responses,
387 callback,
388 } => {
389 let system = &mut self.state.system;
390 let response = self
391 .txn_tracker
392 .oracle(|| async {
393 let headers = request
394 .headers
395 .into_iter()
396 .map(|http::Header { name, value }| {
397 Ok((name.parse()?, value.try_into()?))
398 })
399 .collect::<Result<HeaderMap, ExecutionError>>()?;
400
401 let url = Url::parse(&request.url)?;
402 let host = url
403 .host_str()
404 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
405
406 let (_epoch, committee) = system
407 .current_committee()
408 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
409 let allowed_hosts = &committee.policy().http_request_allow_list;
410
411 ensure!(
412 allowed_hosts.contains(host),
413 ExecutionError::UnauthorizedHttpRequest(url)
414 );
415
416 let request = Client::new()
417 .request(request.method.into(), url)
418 .body(request.body)
419 .headers(headers);
420 #[cfg(not(web))]
421 let request = request.timeout(linera_base::time::Duration::from_millis(
422 committee.policy().http_request_timeout_ms,
423 ));
424
425 let response = request.send().await?;
426
427 let mut response_size_limit =
428 committee.policy().maximum_http_response_bytes;
429
430 if http_responses_are_oracle_responses {
431 response_size_limit = response_size_limit
432 .min(committee.policy().maximum_oracle_response_bytes);
433 }
434 Ok(OracleResponse::Http(
435 Self::receive_http_response(response, response_size_limit).await?,
436 ))
437 })
438 .await?
439 .to_http_response()?;
440 callback.respond(response);
441 }
442
443 ReadBlobContent { blob_id, callback } => {
444 let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
445 content.clone()
446 } else {
447 let content = self.state.system.read_blob_content(blob_id).await?;
448 if blob_id.blob_type == BlobType::Data {
449 self.resource_controller
450 .with_state(&mut self.state.system)
451 .await?
452 .track_blob_read(content.bytes().len() as u64)?;
453 }
454 self.state
455 .system
456 .blob_used(self.txn_tracker, blob_id)
457 .await?;
458 content
459 };
460 callback.respond(content)
461 }
462
463 AssertBlobExists { blob_id, callback } => {
464 self.state.system.assert_blob_exists(blob_id).await?;
465 if blob_id.blob_type == BlobType::Data {
467 self.resource_controller
468 .with_state(&mut self.state.system)
469 .await?
470 .track_blob_read(0)?;
471 }
472 let is_new = self
473 .state
474 .system
475 .blob_used(self.txn_tracker, blob_id)
476 .await?;
477 if is_new {
478 self.txn_tracker
479 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
480 }
481 callback.respond(());
482 }
483
484 Emit {
485 stream_id,
486 value,
487 callback,
488 } => {
489 let count = self
490 .state
491 .stream_event_counts
492 .get_mut_or_default(&stream_id)
493 .await?;
494 let index = *count;
495 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
496 self.txn_tracker.add_event(stream_id, index, value);
497 callback.respond(index)
498 }
499
500 ReadEvent { event_id, callback } => {
501 let extra = self.state.context().extra();
502 let event = self
503 .txn_tracker
504 .oracle(|| async {
505 let event = extra
506 .get_event(event_id.clone())
507 .await?
508 .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
509 Ok(OracleResponse::Event(event_id.clone(), event))
510 })
511 .await?
512 .to_event(&event_id)?;
513 callback.respond(event);
514 }
515
516 SubscribeToEvents {
517 chain_id,
518 stream_id,
519 subscriber_app_id,
520 callback,
521 } => {
522 let subscriptions = self
523 .state
524 .system
525 .event_subscriptions
526 .get_mut_or_default(&(chain_id, stream_id.clone()))
527 .await?;
528 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
529 subscriptions.next_index
530 } else {
531 0
532 };
533 self.txn_tracker.add_stream_to_process(
534 subscriber_app_id,
535 chain_id,
536 stream_id,
537 0,
538 next_index,
539 );
540 callback.respond(());
541 }
542
543 UnsubscribeFromEvents {
544 chain_id,
545 stream_id,
546 subscriber_app_id,
547 callback,
548 } => {
549 let key = (chain_id, stream_id.clone());
550 let subscriptions = self
551 .state
552 .system
553 .event_subscriptions
554 .get_mut_or_default(&key)
555 .await?;
556 subscriptions.applications.remove(&subscriber_app_id);
557 if subscriptions.applications.is_empty() {
558 self.state.system.event_subscriptions.remove(&key)?;
559 }
560 if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
561 self.txn_tracker
562 .remove_stream_to_process(app_id, chain_id, stream_id);
563 }
564 callback.respond(());
565 }
566
567 GetApplicationPermissions { callback } => {
568 let app_permissions = self.state.system.application_permissions.get();
569 callback.respond(app_permissions.clone());
570 }
571
572 QueryServiceOracle {
573 deadline,
574 application_id,
575 next_block_height,
576 query,
577 callback,
578 } => {
579 let state = &mut self.state;
580 let local_time = self.txn_tracker.local_time();
581 let created_blobs = self.txn_tracker.created_blobs().clone();
582 let bytes = self
583 .txn_tracker
584 .oracle(|| async {
585 let context = QueryContext {
586 chain_id: state.context().extra().chain_id(),
587 next_block_height,
588 local_time,
589 };
590 let QueryOutcome {
591 response,
592 operations,
593 } = Box::pin(state.query_user_application_with_deadline(
594 application_id,
595 context,
596 query,
597 deadline,
598 created_blobs,
599 ))
600 .await?;
601 ensure!(
602 operations.is_empty(),
603 ExecutionError::ServiceOracleQueryOperations(operations)
604 );
605 Ok(OracleResponse::Service(response))
606 })
607 .await?
608 .to_service_response()?;
609 callback.respond(bytes);
610 }
611
612 AddOutgoingMessage { message, callback } => {
613 self.txn_tracker.add_outgoing_message(message);
614 callback.respond(());
615 }
616
617 SetLocalTime {
618 local_time,
619 callback,
620 } => {
621 self.txn_tracker.set_local_time(local_time);
622 callback.respond(());
623 }
624
625 AssertBefore {
626 timestamp,
627 callback,
628 } => {
629 let result = if !self
630 .txn_tracker
631 .replay_oracle_response(OracleResponse::Assert)?
632 {
633 let local_time = self.txn_tracker.local_time();
635 if local_time >= timestamp {
636 Err(ExecutionError::AssertBefore {
637 timestamp,
638 local_time,
639 })
640 } else {
641 Ok(())
642 }
643 } else {
644 Ok(())
645 };
646 callback.respond(result);
647 }
648
649 AddCreatedBlob { blob, callback } => {
650 self.txn_tracker.add_created_blob(blob);
651 callback.respond(());
652 }
653
654 ValidationRound { round, callback } => {
655 let validation_round = self
656 .txn_tracker
657 .oracle(|| async { Ok(OracleResponse::Round(round)) })
658 .await?
659 .to_round()?;
660 callback.respond(validation_round);
661 }
662 }
663
664 Ok(())
665 }
666
667 async fn process_subscriptions(
670 &mut self,
671 context: ProcessStreamsContext,
672 ) -> Result<(), ExecutionError> {
673 let mut processed = BTreeSet::new();
676 loop {
677 let to_process = self
678 .txn_tracker
679 .take_streams_to_process()
680 .into_iter()
681 .filter_map(|(app_id, updates)| {
682 let updates = updates
683 .into_iter()
684 .filter_map(|update| {
685 if !processed.insert((
686 app_id,
687 update.chain_id,
688 update.stream_id.clone(),
689 )) {
690 return None;
691 }
692 Some(update)
693 })
694 .collect::<Vec<_>>();
695 if updates.is_empty() {
696 return None;
697 }
698 Some((app_id, updates))
699 })
700 .collect::<BTreeMap<_, _>>();
701 if to_process.is_empty() {
702 return Ok(());
703 }
704 for (app_id, updates) in to_process {
705 self.run_user_action(
706 app_id,
707 UserAction::ProcessStreams(context, updates),
708 None,
709 None,
710 )
711 .await?;
712 }
713 }
714 }
715
716 pub(crate) async fn run_user_action(
717 &mut self,
718 application_id: ApplicationId,
719 action: UserAction,
720 refund_grant_to: Option<Account>,
721 grant: Option<&mut Amount>,
722 ) -> Result<(), ExecutionError> {
723 let ExecutionRuntimeConfig {} = self.state.context().extra().execution_runtime_config();
724 self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
725 .await
726 }
727
728 async fn run_user_action_with_runtime(
729 &mut self,
730 application_id: ApplicationId,
731 action: UserAction,
732 refund_grant_to: Option<Account>,
733 grant: Option<&mut Amount>,
734 ) -> Result<(), ExecutionError> {
735 let chain_id = self.state.context().extra().chain_id();
736 let mut cloned_grant = grant.as_ref().map(|x| **x);
737 let initial_balance = self
738 .resource_controller
739 .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
740 .await?
741 .balance()?;
742 let controller = ResourceController::new(
743 self.resource_controller.policy().clone(),
744 self.resource_controller.tracker,
745 initial_balance,
746 );
747 let (execution_state_sender, mut execution_state_receiver) =
748 futures::channel::mpsc::unbounded();
749
750 let (code, description) = self.load_contract(application_id).await?;
751
752 let contract_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
753 let runtime = ContractSyncRuntime::new(
754 execution_state_sender,
755 chain_id,
756 refund_grant_to,
757 controller,
758 &action,
759 );
760
761 async move {
762 let code = codes.next().await.expect("we send this immediately below");
763 runtime.preload_contract(application_id, code, description)?;
764 runtime.run_action(application_id, chain_id, action)
765 }
766 })
767 .await;
768
769 contract_runtime_task.send(code)?;
770
771 while let Some(request) = execution_state_receiver.next().await {
772 self.handle_request(request).await?;
773 }
774
775 let (result, controller) = contract_runtime_task.join().await?;
776
777 self.txn_tracker.add_operation_result(result);
778
779 self.resource_controller
780 .with_state_and_grant(&mut self.state.system, grant)
781 .await?
782 .merge_balance(initial_balance, controller.balance()?)?;
783 self.resource_controller.tracker = controller.tracker;
784
785 Ok(())
786 }
787
788 pub async fn execute_operation(
789 &mut self,
790 context: OperationContext,
791 operation: Operation,
792 ) -> Result<(), ExecutionError> {
793 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
794 match operation {
795 Operation::System(op) => {
796 let new_application = self
797 .state
798 .system
799 .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
800 .await?;
801 if let Some((application_id, argument)) = new_application {
802 let user_action = UserAction::Instantiate(context, argument);
803 self.run_user_action(
804 application_id,
805 user_action,
806 context.refund_grant_to(),
807 None,
808 )
809 .await?;
810 }
811 }
812 Operation::User {
813 application_id,
814 bytes,
815 } => {
816 self.run_user_action(
817 application_id,
818 UserAction::Operation(context, bytes),
819 context.refund_grant_to(),
820 None,
821 )
822 .await?;
823 }
824 }
825 self.process_subscriptions(context.into()).await?;
826 Ok(())
827 }
828
829 pub async fn execute_message(
830 &mut self,
831 context: MessageContext,
832 message: Message,
833 grant: Option<&mut Amount>,
834 ) -> Result<(), ExecutionError> {
835 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
836 match message {
837 Message::System(message) => {
838 let outcome = self.state.system.execute_message(context, message).await?;
839 self.txn_tracker.add_outgoing_messages(outcome);
840 }
841 Message::User {
842 application_id,
843 bytes,
844 } => {
845 self.run_user_action(
846 application_id,
847 UserAction::Message(context, bytes),
848 context.refund_grant_to,
849 grant,
850 )
851 .await?;
852 }
853 }
854 self.process_subscriptions(context.into()).await?;
855 Ok(())
856 }
857
858 pub fn bounce_message(
859 &mut self,
860 context: MessageContext,
861 grant: Amount,
862 message: Message,
863 ) -> Result<(), ExecutionError> {
864 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
865 self.txn_tracker.add_outgoing_message(OutgoingMessage {
866 destination: context.origin,
867 authenticated_signer: context.authenticated_signer,
868 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
869 grant,
870 kind: MessageKind::Bouncing,
871 message,
872 });
873 Ok(())
874 }
875
876 pub fn send_refund(
877 &mut self,
878 context: MessageContext,
879 amount: Amount,
880 ) -> Result<(), ExecutionError> {
881 assert_eq!(context.chain_id, self.state.context().extra().chain_id());
882 if amount.is_zero() {
883 return Ok(());
884 }
885 let Some(account) = context.refund_grant_to else {
886 return Err(ExecutionError::InternalError(
887 "Messages with grants should have a non-empty `refund_grant_to`",
888 ));
889 };
890 let message = SystemMessage::Credit {
891 amount,
892 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
893 target: account.owner,
894 };
895 self.txn_tracker.add_outgoing_message(
896 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
897 );
898 Ok(())
899 }
900
901 async fn receive_http_response(
905 response: reqwest::Response,
906 size_limit: u64,
907 ) -> Result<http::Response, ExecutionError> {
908 let status = response.status().as_u16();
909 let maybe_content_length = response.content_length();
910
911 let headers = response
912 .headers()
913 .iter()
914 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
915 .collect::<Vec<_>>();
916
917 let total_header_size = headers
918 .iter()
919 .map(|header| (header.name.len() + header.value.len()) as u64)
920 .sum();
921
922 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
923 ExecutionError::HttpResponseSizeLimitExceeded {
924 limit: size_limit,
925 size: total_header_size,
926 },
927 )?;
928
929 if let Some(content_length) = maybe_content_length {
930 if content_length > remaining_bytes {
931 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
932 limit: size_limit,
933 size: content_length + total_header_size,
934 });
935 }
936 }
937
938 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
939 let mut body_stream = response.bytes_stream();
940
941 while let Some(bytes) = body_stream.next().await.transpose()? {
942 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
943 ExecutionError::HttpResponseSizeLimitExceeded {
944 limit: size_limit,
945 size: bytes.len() as u64 + (size_limit - remaining_bytes),
946 },
947 )?;
948
949 body.extend(&bytes);
950 }
951
952 Ok(http::Response {
953 status,
954 headers,
955 body,
956 })
957 }
958}
959
960#[derive(Debug)]
962pub enum ExecutionRequest {
963 #[cfg(not(web))]
964 LoadContract {
965 id: ApplicationId,
966 #[debug(skip)]
967 callback: Sender<(UserContractCode, ApplicationDescription)>,
968 },
969
970 #[cfg(not(web))]
971 LoadService {
972 id: ApplicationId,
973 #[debug(skip)]
974 callback: Sender<(UserServiceCode, ApplicationDescription)>,
975 },
976
977 ChainBalance {
978 #[debug(skip)]
979 callback: Sender<Amount>,
980 },
981
982 OwnerBalance {
983 owner: AccountOwner,
984 #[debug(skip)]
985 callback: Sender<Amount>,
986 },
987
988 OwnerBalances {
989 #[debug(skip)]
990 callback: Sender<Vec<(AccountOwner, Amount)>>,
991 },
992
993 BalanceOwners {
994 #[debug(skip)]
995 callback: Sender<Vec<AccountOwner>>,
996 },
997
998 Transfer {
999 source: AccountOwner,
1000 destination: Account,
1001 amount: Amount,
1002 #[debug(skip_if = Option::is_none)]
1003 signer: Option<AccountOwner>,
1004 application_id: ApplicationId,
1005 #[debug(skip)]
1006 callback: Sender<()>,
1007 },
1008
1009 Claim {
1010 source: Account,
1011 destination: Account,
1012 amount: Amount,
1013 #[debug(skip_if = Option::is_none)]
1014 signer: Option<AccountOwner>,
1015 application_id: ApplicationId,
1016 #[debug(skip)]
1017 callback: Sender<()>,
1018 },
1019
1020 SystemTimestamp {
1021 #[debug(skip)]
1022 callback: Sender<Timestamp>,
1023 },
1024
1025 ChainOwnership {
1026 #[debug(skip)]
1027 callback: Sender<ChainOwnership>,
1028 },
1029
1030 ReadValueBytes {
1031 id: ApplicationId,
1032 #[debug(with = hex_debug)]
1033 key: Vec<u8>,
1034 #[debug(skip)]
1035 callback: Sender<Option<Vec<u8>>>,
1036 },
1037
1038 ContainsKey {
1039 id: ApplicationId,
1040 key: Vec<u8>,
1041 #[debug(skip)]
1042 callback: Sender<bool>,
1043 },
1044
1045 ContainsKeys {
1046 id: ApplicationId,
1047 #[debug(with = hex_vec_debug)]
1048 keys: Vec<Vec<u8>>,
1049 callback: Sender<Vec<bool>>,
1050 },
1051
1052 ReadMultiValuesBytes {
1053 id: ApplicationId,
1054 #[debug(with = hex_vec_debug)]
1055 keys: Vec<Vec<u8>>,
1056 #[debug(skip)]
1057 callback: Sender<Vec<Option<Vec<u8>>>>,
1058 },
1059
1060 FindKeysByPrefix {
1061 id: ApplicationId,
1062 #[debug(with = hex_debug)]
1063 key_prefix: Vec<u8>,
1064 #[debug(skip)]
1065 callback: Sender<Vec<Vec<u8>>>,
1066 },
1067
1068 FindKeyValuesByPrefix {
1069 id: ApplicationId,
1070 #[debug(with = hex_debug)]
1071 key_prefix: Vec<u8>,
1072 #[debug(skip)]
1073 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1074 },
1075
1076 WriteBatch {
1077 id: ApplicationId,
1078 batch: Batch,
1079 #[debug(skip)]
1080 callback: Sender<()>,
1081 },
1082
1083 OpenChain {
1084 ownership: ChainOwnership,
1085 #[debug(skip_if = Amount::is_zero)]
1086 balance: Amount,
1087 parent_id: ChainId,
1088 block_height: BlockHeight,
1089 application_permissions: ApplicationPermissions,
1090 timestamp: Timestamp,
1091 #[debug(skip)]
1092 callback: Sender<ChainId>,
1093 },
1094
1095 CloseChain {
1096 application_id: ApplicationId,
1097 #[debug(skip)]
1098 callback: Sender<Result<(), ExecutionError>>,
1099 },
1100
1101 ChangeApplicationPermissions {
1102 application_id: ApplicationId,
1103 application_permissions: ApplicationPermissions,
1104 #[debug(skip)]
1105 callback: Sender<Result<(), ExecutionError>>,
1106 },
1107
1108 CreateApplication {
1109 chain_id: ChainId,
1110 block_height: BlockHeight,
1111 module_id: ModuleId,
1112 parameters: Vec<u8>,
1113 required_application_ids: Vec<ApplicationId>,
1114 #[debug(skip)]
1115 callback: Sender<CreateApplicationResult>,
1116 },
1117
1118 PerformHttpRequest {
1119 request: http::Request,
1120 http_responses_are_oracle_responses: bool,
1121 #[debug(skip)]
1122 callback: Sender<http::Response>,
1123 },
1124
1125 ReadBlobContent {
1126 blob_id: BlobId,
1127 #[debug(skip)]
1128 callback: Sender<BlobContent>,
1129 },
1130
1131 AssertBlobExists {
1132 blob_id: BlobId,
1133 #[debug(skip)]
1134 callback: Sender<()>,
1135 },
1136
1137 Emit {
1138 stream_id: StreamId,
1139 #[debug(with = hex_debug)]
1140 value: Vec<u8>,
1141 #[debug(skip)]
1142 callback: Sender<u32>,
1143 },
1144
1145 ReadEvent {
1146 event_id: EventId,
1147 callback: oneshot::Sender<Vec<u8>>,
1148 },
1149
1150 SubscribeToEvents {
1151 chain_id: ChainId,
1152 stream_id: StreamId,
1153 subscriber_app_id: ApplicationId,
1154 #[debug(skip)]
1155 callback: Sender<()>,
1156 },
1157
1158 UnsubscribeFromEvents {
1159 chain_id: ChainId,
1160 stream_id: StreamId,
1161 subscriber_app_id: ApplicationId,
1162 #[debug(skip)]
1163 callback: Sender<()>,
1164 },
1165
1166 GetApplicationPermissions {
1167 #[debug(skip)]
1168 callback: Sender<ApplicationPermissions>,
1169 },
1170
1171 QueryServiceOracle {
1172 deadline: Option<Instant>,
1173 application_id: ApplicationId,
1174 next_block_height: BlockHeight,
1175 query: Vec<u8>,
1176 #[debug(skip)]
1177 callback: Sender<Vec<u8>>,
1178 },
1179
1180 AddOutgoingMessage {
1181 message: crate::OutgoingMessage,
1182 #[debug(skip)]
1183 callback: Sender<()>,
1184 },
1185
1186 SetLocalTime {
1187 local_time: Timestamp,
1188 #[debug(skip)]
1189 callback: Sender<()>,
1190 },
1191
1192 AssertBefore {
1193 timestamp: Timestamp,
1194 #[debug(skip)]
1195 callback: Sender<Result<(), ExecutionError>>,
1196 },
1197
1198 AddCreatedBlob {
1199 blob: crate::Blob,
1200 #[debug(skip)]
1201 callback: Sender<()>,
1202 },
1203
1204 ValidationRound {
1205 round: Option<u32>,
1206 #[debug(skip)]
1207 callback: Sender<Option<u32>>,
1208 },
1209}