linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15        Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19    ownership::ChainOwnership,
20    time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27    execution::UserAction,
28    runtime::ContractSyncRuntime,
29    system::{CreateApplicationResult, OpenChainConfig},
30    util::{OracleResponseExt as _, RespondExt as _},
31    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeConfig,
32    ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, MessageKind, ModuleId,
33    Operation, OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext,
34    QueryOutcome, ResourceController, SystemMessage, TransactionTracker, UserContractCode,
35    UserServiceCode,
36};
37
38/// Actor for handling requests to the execution state.
39pub struct ExecutionStateActor<'a, C> {
40    state: &'a mut ExecutionStateView<C>,
41    txn_tracker: &'a mut TransactionTracker,
42    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
43}
44
45#[cfg(with_metrics)]
46mod metrics {
47    use std::sync::LazyLock;
48
49    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
50    use prometheus::HistogramVec;
51
52    /// Histogram of the latency to load a contract bytecode.
53    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54        register_histogram_vec(
55            "load_contract_latency",
56            "Load contract latency",
57            &[],
58            exponential_bucket_latencies(250.0),
59        )
60    });
61
62    /// Histogram of the latency to load a service bytecode.
63    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
64        register_histogram_vec(
65            "load_service_latency",
66            "Load service latency",
67            &[],
68            exponential_bucket_latencies(250.0),
69        )
70    });
71}
72
73pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
74
75impl<'a, C> ExecutionStateActor<'a, C>
76where
77    C: Context + Clone + Send + Sync + 'static,
78    C::Extra: ExecutionRuntimeContext,
79{
80    /// Creates a new execution state actor.
81    pub fn new(
82        state: &'a mut ExecutionStateView<C>,
83        txn_tracker: &'a mut TransactionTracker,
84        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
85    ) -> Self {
86        Self {
87            state,
88            txn_tracker,
89            resource_controller,
90        }
91    }
92
93    pub(crate) async fn load_contract(
94        &mut self,
95        id: ApplicationId,
96    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
97        #[cfg(with_metrics)]
98        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
99        let blob_id = id.description_blob_id();
100        let description = match self.txn_tracker.get_blob_content(&blob_id) {
101            Some(blob) => bcs::from_bytes(blob.bytes())?,
102            None => {
103                self.state
104                    .system
105                    .describe_application(id, self.txn_tracker)
106                    .await?
107            }
108        };
109        let code = self
110            .state
111            .context()
112            .extra()
113            .get_user_contract(&description, self.txn_tracker)
114            .await?;
115        Ok((code, description))
116    }
117
118    pub(crate) async fn load_service(
119        &mut self,
120        id: ApplicationId,
121    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
122        #[cfg(with_metrics)]
123        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
124        let blob_id = id.description_blob_id();
125        let description = match self.txn_tracker.get_blob_content(&blob_id) {
126            Some(blob) => bcs::from_bytes(blob.bytes())?,
127            None => {
128                self.state
129                    .system
130                    .describe_application(id, self.txn_tracker)
131                    .await?
132            }
133        };
134        let code = self
135            .state
136            .context()
137            .extra()
138            .get_user_service(&description, self.txn_tracker)
139            .await?;
140        Ok((code, description))
141    }
142
143    // TODO(#1416): Support concurrent I/O.
144    pub(crate) async fn handle_request(
145        &mut self,
146        request: ExecutionRequest,
147    ) -> Result<(), ExecutionError> {
148        use ExecutionRequest::*;
149        match request {
150            #[cfg(not(web))]
151            LoadContract { id, callback } => {
152                let (code, description) = self.load_contract(id).await?;
153                callback.respond((code, description))
154            }
155            #[cfg(not(web))]
156            LoadService { id, callback } => {
157                let (code, description) = self.load_service(id).await?;
158                callback.respond((code, description))
159            }
160
161            ChainBalance { callback } => {
162                let balance = *self.state.system.balance.get();
163                callback.respond(balance);
164            }
165
166            OwnerBalance { owner, callback } => {
167                let balance = self
168                    .state
169                    .system
170                    .balances
171                    .get(&owner)
172                    .await?
173                    .unwrap_or_default();
174                callback.respond(balance);
175            }
176
177            OwnerBalances { callback } => {
178                callback.respond(self.state.system.balances.index_values().await?);
179            }
180
181            BalanceOwners { callback } => {
182                let owners = self.state.system.balances.indices().await?;
183                callback.respond(owners);
184            }
185
186            Transfer {
187                source,
188                destination,
189                amount,
190                signer,
191                application_id,
192                callback,
193            } => {
194                let maybe_message = self
195                    .state
196                    .system
197                    .transfer(signer, Some(application_id), source, destination, amount)
198                    .await?;
199                self.txn_tracker.add_outgoing_messages(maybe_message);
200                callback.respond(());
201            }
202
203            Claim {
204                source,
205                destination,
206                amount,
207                signer,
208                application_id,
209                callback,
210            } => {
211                let maybe_message = self
212                    .state
213                    .system
214                    .claim(
215                        signer,
216                        Some(application_id),
217                        source.owner,
218                        source.chain_id,
219                        destination,
220                        amount,
221                    )
222                    .await?;
223                self.txn_tracker.add_outgoing_messages(maybe_message);
224                callback.respond(());
225            }
226
227            SystemTimestamp { callback } => {
228                let timestamp = *self.state.system.timestamp.get();
229                callback.respond(timestamp);
230            }
231
232            ChainOwnership { callback } => {
233                let ownership = self.state.system.ownership.get().clone();
234                callback.respond(ownership);
235            }
236
237            ContainsKey { id, key, callback } => {
238                let view = self.state.users.try_load_entry(&id).await?;
239                let result = match view {
240                    Some(view) => view.contains_key(&key).await?,
241                    None => false,
242                };
243                callback.respond(result);
244            }
245
246            ContainsKeys { id, keys, callback } => {
247                let view = self.state.users.try_load_entry(&id).await?;
248                let result = match view {
249                    Some(view) => view.contains_keys(keys).await?,
250                    None => vec![false; keys.len()],
251                };
252                callback.respond(result);
253            }
254
255            ReadMultiValuesBytes { id, keys, callback } => {
256                let view = self.state.users.try_load_entry(&id).await?;
257                let values = match view {
258                    Some(view) => view.multi_get(keys).await?,
259                    None => vec![None; keys.len()],
260                };
261                callback.respond(values);
262            }
263
264            ReadValueBytes { id, key, callback } => {
265                let view = self.state.users.try_load_entry(&id).await?;
266                let result = match view {
267                    Some(view) => view.get(&key).await?,
268                    None => None,
269                };
270                callback.respond(result);
271            }
272
273            FindKeysByPrefix {
274                id,
275                key_prefix,
276                callback,
277            } => {
278                let view = self.state.users.try_load_entry(&id).await?;
279                let result = match view {
280                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
281                    None => Vec::new(),
282                };
283                callback.respond(result);
284            }
285
286            FindKeyValuesByPrefix {
287                id,
288                key_prefix,
289                callback,
290            } => {
291                let view = self.state.users.try_load_entry(&id).await?;
292                let result = match view {
293                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
294                    None => Vec::new(),
295                };
296                callback.respond(result);
297            }
298
299            WriteBatch {
300                id,
301                batch,
302                callback,
303            } => {
304                let mut view = self.state.users.try_load_entry_mut(&id).await?;
305                view.write_batch(batch).await?;
306                callback.respond(());
307            }
308
309            OpenChain {
310                ownership,
311                balance,
312                parent_id,
313                block_height,
314                application_permissions,
315                timestamp,
316                callback,
317            } => {
318                let config = OpenChainConfig {
319                    ownership,
320                    balance,
321                    application_permissions,
322                };
323                let chain_id = self
324                    .state
325                    .system
326                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
327                    .await?;
328                callback.respond(chain_id);
329            }
330
331            CloseChain {
332                application_id,
333                callback,
334            } => {
335                let app_permissions = self.state.system.application_permissions.get();
336                if !app_permissions.can_close_chain(&application_id) {
337                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
338                } else {
339                    self.state.system.close_chain();
340                    callback.respond(Ok(()));
341                }
342            }
343
344            ChangeApplicationPermissions {
345                application_id,
346                application_permissions,
347                callback,
348            } => {
349                let app_permissions = self.state.system.application_permissions.get();
350                if !app_permissions.can_change_application_permissions(&application_id) {
351                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
352                } else {
353                    self.state
354                        .system
355                        .application_permissions
356                        .set(application_permissions);
357                    callback.respond(Ok(()));
358                }
359            }
360
361            CreateApplication {
362                chain_id,
363                block_height,
364                module_id,
365                parameters,
366                required_application_ids,
367                callback,
368            } => {
369                let create_application_result = self
370                    .state
371                    .system
372                    .create_application(
373                        chain_id,
374                        block_height,
375                        module_id,
376                        parameters,
377                        required_application_ids,
378                        self.txn_tracker,
379                    )
380                    .await?;
381                callback.respond(create_application_result);
382            }
383
384            PerformHttpRequest {
385                request,
386                http_responses_are_oracle_responses,
387                callback,
388            } => {
389                let system = &mut self.state.system;
390                let response = self
391                    .txn_tracker
392                    .oracle(|| async {
393                        let headers = request
394                            .headers
395                            .into_iter()
396                            .map(|http::Header { name, value }| {
397                                Ok((name.parse()?, value.try_into()?))
398                            })
399                            .collect::<Result<HeaderMap, ExecutionError>>()?;
400
401                        let url = Url::parse(&request.url)?;
402                        let host = url
403                            .host_str()
404                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
405
406                        let (_epoch, committee) = system
407                            .current_committee()
408                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
409                        let allowed_hosts = &committee.policy().http_request_allow_list;
410
411                        ensure!(
412                            allowed_hosts.contains(host),
413                            ExecutionError::UnauthorizedHttpRequest(url)
414                        );
415
416                        let request = Client::new()
417                            .request(request.method.into(), url)
418                            .body(request.body)
419                            .headers(headers);
420                        #[cfg(not(web))]
421                        let request = request.timeout(linera_base::time::Duration::from_millis(
422                            committee.policy().http_request_timeout_ms,
423                        ));
424
425                        let response = request.send().await?;
426
427                        let mut response_size_limit =
428                            committee.policy().maximum_http_response_bytes;
429
430                        if http_responses_are_oracle_responses {
431                            response_size_limit = response_size_limit
432                                .min(committee.policy().maximum_oracle_response_bytes);
433                        }
434                        Ok(OracleResponse::Http(
435                            Self::receive_http_response(response, response_size_limit).await?,
436                        ))
437                    })
438                    .await?
439                    .to_http_response()?;
440                callback.respond(response);
441            }
442
443            ReadBlobContent { blob_id, callback } => {
444                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
445                    content.clone()
446                } else {
447                    let content = self.state.system.read_blob_content(blob_id).await?;
448                    if blob_id.blob_type == BlobType::Data {
449                        self.resource_controller
450                            .with_state(&mut self.state.system)
451                            .await?
452                            .track_blob_read(content.bytes().len() as u64)?;
453                    }
454                    self.state
455                        .system
456                        .blob_used(self.txn_tracker, blob_id)
457                        .await?;
458                    content
459                };
460                callback.respond(content)
461            }
462
463            AssertBlobExists { blob_id, callback } => {
464                self.state.system.assert_blob_exists(blob_id).await?;
465                // Treating this as reading a size-0 blob for fee purposes.
466                if blob_id.blob_type == BlobType::Data {
467                    self.resource_controller
468                        .with_state(&mut self.state.system)
469                        .await?
470                        .track_blob_read(0)?;
471                }
472                let is_new = self
473                    .state
474                    .system
475                    .blob_used(self.txn_tracker, blob_id)
476                    .await?;
477                if is_new {
478                    self.txn_tracker
479                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
480                }
481                callback.respond(());
482            }
483
484            Emit {
485                stream_id,
486                value,
487                callback,
488            } => {
489                let count = self
490                    .state
491                    .stream_event_counts
492                    .get_mut_or_default(&stream_id)
493                    .await?;
494                let index = *count;
495                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
496                self.txn_tracker.add_event(stream_id, index, value);
497                callback.respond(index)
498            }
499
500            ReadEvent { event_id, callback } => {
501                let extra = self.state.context().extra();
502                let event = self
503                    .txn_tracker
504                    .oracle(|| async {
505                        let event = extra
506                            .get_event(event_id.clone())
507                            .await?
508                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
509                        Ok(OracleResponse::Event(event_id.clone(), event))
510                    })
511                    .await?
512                    .to_event(&event_id)?;
513                callback.respond(event);
514            }
515
516            SubscribeToEvents {
517                chain_id,
518                stream_id,
519                subscriber_app_id,
520                callback,
521            } => {
522                let subscriptions = self
523                    .state
524                    .system
525                    .event_subscriptions
526                    .get_mut_or_default(&(chain_id, stream_id.clone()))
527                    .await?;
528                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
529                    subscriptions.next_index
530                } else {
531                    0
532                };
533                self.txn_tracker.add_stream_to_process(
534                    subscriber_app_id,
535                    chain_id,
536                    stream_id,
537                    0,
538                    next_index,
539                );
540                callback.respond(());
541            }
542
543            UnsubscribeFromEvents {
544                chain_id,
545                stream_id,
546                subscriber_app_id,
547                callback,
548            } => {
549                let key = (chain_id, stream_id.clone());
550                let subscriptions = self
551                    .state
552                    .system
553                    .event_subscriptions
554                    .get_mut_or_default(&key)
555                    .await?;
556                subscriptions.applications.remove(&subscriber_app_id);
557                if subscriptions.applications.is_empty() {
558                    self.state.system.event_subscriptions.remove(&key)?;
559                }
560                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
561                    self.txn_tracker
562                        .remove_stream_to_process(app_id, chain_id, stream_id);
563                }
564                callback.respond(());
565            }
566
567            GetApplicationPermissions { callback } => {
568                let app_permissions = self.state.system.application_permissions.get();
569                callback.respond(app_permissions.clone());
570            }
571
572            QueryServiceOracle {
573                deadline,
574                application_id,
575                next_block_height,
576                query,
577                callback,
578            } => {
579                let state = &mut self.state;
580                let local_time = self.txn_tracker.local_time();
581                let created_blobs = self.txn_tracker.created_blobs().clone();
582                let bytes = self
583                    .txn_tracker
584                    .oracle(|| async {
585                        let context = QueryContext {
586                            chain_id: state.context().extra().chain_id(),
587                            next_block_height,
588                            local_time,
589                        };
590                        let QueryOutcome {
591                            response,
592                            operations,
593                        } = Box::pin(state.query_user_application_with_deadline(
594                            application_id,
595                            context,
596                            query,
597                            deadline,
598                            created_blobs,
599                        ))
600                        .await?;
601                        ensure!(
602                            operations.is_empty(),
603                            ExecutionError::ServiceOracleQueryOperations(operations)
604                        );
605                        Ok(OracleResponse::Service(response))
606                    })
607                    .await?
608                    .to_service_response()?;
609                callback.respond(bytes);
610            }
611
612            AddOutgoingMessage { message, callback } => {
613                self.txn_tracker.add_outgoing_message(message);
614                callback.respond(());
615            }
616
617            SetLocalTime {
618                local_time,
619                callback,
620            } => {
621                self.txn_tracker.set_local_time(local_time);
622                callback.respond(());
623            }
624
625            AssertBefore {
626                timestamp,
627                callback,
628            } => {
629                let result = if !self
630                    .txn_tracker
631                    .replay_oracle_response(OracleResponse::Assert)?
632                {
633                    // There are no recorded oracle responses, so we check the local time.
634                    let local_time = self.txn_tracker.local_time();
635                    if local_time >= timestamp {
636                        Err(ExecutionError::AssertBefore {
637                            timestamp,
638                            local_time,
639                        })
640                    } else {
641                        Ok(())
642                    }
643                } else {
644                    Ok(())
645                };
646                callback.respond(result);
647            }
648
649            AddCreatedBlob { blob, callback } => {
650                self.txn_tracker.add_created_blob(blob);
651                callback.respond(());
652            }
653
654            ValidationRound { round, callback } => {
655                let validation_round = self
656                    .txn_tracker
657                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
658                    .await?
659                    .to_round()?;
660                callback.respond(validation_round);
661            }
662        }
663
664        Ok(())
665    }
666
667    /// Calls `process_streams` for all applications that are subscribed to streams with new
668    /// events or that have new subscriptions.
669    async fn process_subscriptions(
670        &mut self,
671        context: ProcessStreamsContext,
672    ) -> Result<(), ExecutionError> {
673        // Keep track of which streams we have already processed. This is to guard against
674        // applications unsubscribing and subscribing in the process_streams call itself.
675        let mut processed = BTreeSet::new();
676        loop {
677            let to_process = self
678                .txn_tracker
679                .take_streams_to_process()
680                .into_iter()
681                .filter_map(|(app_id, updates)| {
682                    let updates = updates
683                        .into_iter()
684                        .filter_map(|update| {
685                            if !processed.insert((
686                                app_id,
687                                update.chain_id,
688                                update.stream_id.clone(),
689                            )) {
690                                return None;
691                            }
692                            Some(update)
693                        })
694                        .collect::<Vec<_>>();
695                    if updates.is_empty() {
696                        return None;
697                    }
698                    Some((app_id, updates))
699                })
700                .collect::<BTreeMap<_, _>>();
701            if to_process.is_empty() {
702                return Ok(());
703            }
704            for (app_id, updates) in to_process {
705                self.run_user_action(
706                    app_id,
707                    UserAction::ProcessStreams(context, updates),
708                    None,
709                    None,
710                )
711                .await?;
712            }
713        }
714    }
715
716    pub(crate) async fn run_user_action(
717        &mut self,
718        application_id: ApplicationId,
719        action: UserAction,
720        refund_grant_to: Option<Account>,
721        grant: Option<&mut Amount>,
722    ) -> Result<(), ExecutionError> {
723        let ExecutionRuntimeConfig {} = self.state.context().extra().execution_runtime_config();
724        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
725            .await
726    }
727
728    async fn run_user_action_with_runtime(
729        &mut self,
730        application_id: ApplicationId,
731        action: UserAction,
732        refund_grant_to: Option<Account>,
733        grant: Option<&mut Amount>,
734    ) -> Result<(), ExecutionError> {
735        let chain_id = self.state.context().extra().chain_id();
736        let mut cloned_grant = grant.as_ref().map(|x| **x);
737        let initial_balance = self
738            .resource_controller
739            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
740            .await?
741            .balance()?;
742        let controller = ResourceController::new(
743            self.resource_controller.policy().clone(),
744            self.resource_controller.tracker,
745            initial_balance,
746        );
747        let (execution_state_sender, mut execution_state_receiver) =
748            futures::channel::mpsc::unbounded();
749
750        let (code, description) = self.load_contract(application_id).await?;
751
752        let contract_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
753            let runtime = ContractSyncRuntime::new(
754                execution_state_sender,
755                chain_id,
756                refund_grant_to,
757                controller,
758                &action,
759            );
760
761            async move {
762                let code = codes.next().await.expect("we send this immediately below");
763                runtime.preload_contract(application_id, code, description)?;
764                runtime.run_action(application_id, chain_id, action)
765            }
766        })
767        .await;
768
769        contract_runtime_task.send(code)?;
770
771        while let Some(request) = execution_state_receiver.next().await {
772            self.handle_request(request).await?;
773        }
774
775        let (result, controller) = contract_runtime_task.join().await?;
776
777        self.txn_tracker.add_operation_result(result);
778
779        self.resource_controller
780            .with_state_and_grant(&mut self.state.system, grant)
781            .await?
782            .merge_balance(initial_balance, controller.balance()?)?;
783        self.resource_controller.tracker = controller.tracker;
784
785        Ok(())
786    }
787
788    pub async fn execute_operation(
789        &mut self,
790        context: OperationContext,
791        operation: Operation,
792    ) -> Result<(), ExecutionError> {
793        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
794        match operation {
795            Operation::System(op) => {
796                let new_application = self
797                    .state
798                    .system
799                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
800                    .await?;
801                if let Some((application_id, argument)) = new_application {
802                    let user_action = UserAction::Instantiate(context, argument);
803                    self.run_user_action(
804                        application_id,
805                        user_action,
806                        context.refund_grant_to(),
807                        None,
808                    )
809                    .await?;
810                }
811            }
812            Operation::User {
813                application_id,
814                bytes,
815            } => {
816                self.run_user_action(
817                    application_id,
818                    UserAction::Operation(context, bytes),
819                    context.refund_grant_to(),
820                    None,
821                )
822                .await?;
823            }
824        }
825        self.process_subscriptions(context.into()).await?;
826        Ok(())
827    }
828
829    pub async fn execute_message(
830        &mut self,
831        context: MessageContext,
832        message: Message,
833        grant: Option<&mut Amount>,
834    ) -> Result<(), ExecutionError> {
835        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
836        match message {
837            Message::System(message) => {
838                let outcome = self.state.system.execute_message(context, message).await?;
839                self.txn_tracker.add_outgoing_messages(outcome);
840            }
841            Message::User {
842                application_id,
843                bytes,
844            } => {
845                self.run_user_action(
846                    application_id,
847                    UserAction::Message(context, bytes),
848                    context.refund_grant_to,
849                    grant,
850                )
851                .await?;
852            }
853        }
854        self.process_subscriptions(context.into()).await?;
855        Ok(())
856    }
857
858    pub fn bounce_message(
859        &mut self,
860        context: MessageContext,
861        grant: Amount,
862        message: Message,
863    ) -> Result<(), ExecutionError> {
864        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
865        self.txn_tracker.add_outgoing_message(OutgoingMessage {
866            destination: context.origin,
867            authenticated_signer: context.authenticated_signer,
868            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
869            grant,
870            kind: MessageKind::Bouncing,
871            message,
872        });
873        Ok(())
874    }
875
876    pub fn send_refund(
877        &mut self,
878        context: MessageContext,
879        amount: Amount,
880    ) -> Result<(), ExecutionError> {
881        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
882        if amount.is_zero() {
883            return Ok(());
884        }
885        let Some(account) = context.refund_grant_to else {
886            return Err(ExecutionError::InternalError(
887                "Messages with grants should have a non-empty `refund_grant_to`",
888            ));
889        };
890        let message = SystemMessage::Credit {
891            amount,
892            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
893            target: account.owner,
894        };
895        self.txn_tracker.add_outgoing_message(
896            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
897        );
898        Ok(())
899    }
900
901    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
902    ///
903    /// Ensures that the response does not exceed the provided `size_limit`.
904    async fn receive_http_response(
905        response: reqwest::Response,
906        size_limit: u64,
907    ) -> Result<http::Response, ExecutionError> {
908        let status = response.status().as_u16();
909        let maybe_content_length = response.content_length();
910
911        let headers = response
912            .headers()
913            .iter()
914            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
915            .collect::<Vec<_>>();
916
917        let total_header_size = headers
918            .iter()
919            .map(|header| (header.name.len() + header.value.len()) as u64)
920            .sum();
921
922        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
923            ExecutionError::HttpResponseSizeLimitExceeded {
924                limit: size_limit,
925                size: total_header_size,
926            },
927        )?;
928
929        if let Some(content_length) = maybe_content_length {
930            if content_length > remaining_bytes {
931                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
932                    limit: size_limit,
933                    size: content_length + total_header_size,
934                });
935            }
936        }
937
938        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
939        let mut body_stream = response.bytes_stream();
940
941        while let Some(bytes) = body_stream.next().await.transpose()? {
942            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
943                ExecutionError::HttpResponseSizeLimitExceeded {
944                    limit: size_limit,
945                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
946                },
947            )?;
948
949            body.extend(&bytes);
950        }
951
952        Ok(http::Response {
953            status,
954            headers,
955            body,
956        })
957    }
958}
959
960/// Requests to the execution state.
961#[derive(Debug)]
962pub enum ExecutionRequest {
963    #[cfg(not(web))]
964    LoadContract {
965        id: ApplicationId,
966        #[debug(skip)]
967        callback: Sender<(UserContractCode, ApplicationDescription)>,
968    },
969
970    #[cfg(not(web))]
971    LoadService {
972        id: ApplicationId,
973        #[debug(skip)]
974        callback: Sender<(UserServiceCode, ApplicationDescription)>,
975    },
976
977    ChainBalance {
978        #[debug(skip)]
979        callback: Sender<Amount>,
980    },
981
982    OwnerBalance {
983        owner: AccountOwner,
984        #[debug(skip)]
985        callback: Sender<Amount>,
986    },
987
988    OwnerBalances {
989        #[debug(skip)]
990        callback: Sender<Vec<(AccountOwner, Amount)>>,
991    },
992
993    BalanceOwners {
994        #[debug(skip)]
995        callback: Sender<Vec<AccountOwner>>,
996    },
997
998    Transfer {
999        source: AccountOwner,
1000        destination: Account,
1001        amount: Amount,
1002        #[debug(skip_if = Option::is_none)]
1003        signer: Option<AccountOwner>,
1004        application_id: ApplicationId,
1005        #[debug(skip)]
1006        callback: Sender<()>,
1007    },
1008
1009    Claim {
1010        source: Account,
1011        destination: Account,
1012        amount: Amount,
1013        #[debug(skip_if = Option::is_none)]
1014        signer: Option<AccountOwner>,
1015        application_id: ApplicationId,
1016        #[debug(skip)]
1017        callback: Sender<()>,
1018    },
1019
1020    SystemTimestamp {
1021        #[debug(skip)]
1022        callback: Sender<Timestamp>,
1023    },
1024
1025    ChainOwnership {
1026        #[debug(skip)]
1027        callback: Sender<ChainOwnership>,
1028    },
1029
1030    ReadValueBytes {
1031        id: ApplicationId,
1032        #[debug(with = hex_debug)]
1033        key: Vec<u8>,
1034        #[debug(skip)]
1035        callback: Sender<Option<Vec<u8>>>,
1036    },
1037
1038    ContainsKey {
1039        id: ApplicationId,
1040        key: Vec<u8>,
1041        #[debug(skip)]
1042        callback: Sender<bool>,
1043    },
1044
1045    ContainsKeys {
1046        id: ApplicationId,
1047        #[debug(with = hex_vec_debug)]
1048        keys: Vec<Vec<u8>>,
1049        callback: Sender<Vec<bool>>,
1050    },
1051
1052    ReadMultiValuesBytes {
1053        id: ApplicationId,
1054        #[debug(with = hex_vec_debug)]
1055        keys: Vec<Vec<u8>>,
1056        #[debug(skip)]
1057        callback: Sender<Vec<Option<Vec<u8>>>>,
1058    },
1059
1060    FindKeysByPrefix {
1061        id: ApplicationId,
1062        #[debug(with = hex_debug)]
1063        key_prefix: Vec<u8>,
1064        #[debug(skip)]
1065        callback: Sender<Vec<Vec<u8>>>,
1066    },
1067
1068    FindKeyValuesByPrefix {
1069        id: ApplicationId,
1070        #[debug(with = hex_debug)]
1071        key_prefix: Vec<u8>,
1072        #[debug(skip)]
1073        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1074    },
1075
1076    WriteBatch {
1077        id: ApplicationId,
1078        batch: Batch,
1079        #[debug(skip)]
1080        callback: Sender<()>,
1081    },
1082
1083    OpenChain {
1084        ownership: ChainOwnership,
1085        #[debug(skip_if = Amount::is_zero)]
1086        balance: Amount,
1087        parent_id: ChainId,
1088        block_height: BlockHeight,
1089        application_permissions: ApplicationPermissions,
1090        timestamp: Timestamp,
1091        #[debug(skip)]
1092        callback: Sender<ChainId>,
1093    },
1094
1095    CloseChain {
1096        application_id: ApplicationId,
1097        #[debug(skip)]
1098        callback: Sender<Result<(), ExecutionError>>,
1099    },
1100
1101    ChangeApplicationPermissions {
1102        application_id: ApplicationId,
1103        application_permissions: ApplicationPermissions,
1104        #[debug(skip)]
1105        callback: Sender<Result<(), ExecutionError>>,
1106    },
1107
1108    CreateApplication {
1109        chain_id: ChainId,
1110        block_height: BlockHeight,
1111        module_id: ModuleId,
1112        parameters: Vec<u8>,
1113        required_application_ids: Vec<ApplicationId>,
1114        #[debug(skip)]
1115        callback: Sender<CreateApplicationResult>,
1116    },
1117
1118    PerformHttpRequest {
1119        request: http::Request,
1120        http_responses_are_oracle_responses: bool,
1121        #[debug(skip)]
1122        callback: Sender<http::Response>,
1123    },
1124
1125    ReadBlobContent {
1126        blob_id: BlobId,
1127        #[debug(skip)]
1128        callback: Sender<BlobContent>,
1129    },
1130
1131    AssertBlobExists {
1132        blob_id: BlobId,
1133        #[debug(skip)]
1134        callback: Sender<()>,
1135    },
1136
1137    Emit {
1138        stream_id: StreamId,
1139        #[debug(with = hex_debug)]
1140        value: Vec<u8>,
1141        #[debug(skip)]
1142        callback: Sender<u32>,
1143    },
1144
1145    ReadEvent {
1146        event_id: EventId,
1147        callback: oneshot::Sender<Vec<u8>>,
1148    },
1149
1150    SubscribeToEvents {
1151        chain_id: ChainId,
1152        stream_id: StreamId,
1153        subscriber_app_id: ApplicationId,
1154        #[debug(skip)]
1155        callback: Sender<()>,
1156    },
1157
1158    UnsubscribeFromEvents {
1159        chain_id: ChainId,
1160        stream_id: StreamId,
1161        subscriber_app_id: ApplicationId,
1162        #[debug(skip)]
1163        callback: Sender<()>,
1164    },
1165
1166    GetApplicationPermissions {
1167        #[debug(skip)]
1168        callback: Sender<ApplicationPermissions>,
1169    },
1170
1171    QueryServiceOracle {
1172        deadline: Option<Instant>,
1173        application_id: ApplicationId,
1174        next_block_height: BlockHeight,
1175        query: Vec<u8>,
1176        #[debug(skip)]
1177        callback: Sender<Vec<u8>>,
1178    },
1179
1180    AddOutgoingMessage {
1181        message: crate::OutgoingMessage,
1182        #[debug(skip)]
1183        callback: Sender<()>,
1184    },
1185
1186    SetLocalTime {
1187        local_time: Timestamp,
1188        #[debug(skip)]
1189        callback: Sender<()>,
1190    },
1191
1192    AssertBefore {
1193        timestamp: Timestamp,
1194        #[debug(skip)]
1195        callback: Sender<Result<(), ExecutionError>>,
1196    },
1197
1198    AddCreatedBlob {
1199        blob: crate::Blob,
1200        #[debug(skip)]
1201        callback: Sender<()>,
1202    },
1203
1204    ValidationRound {
1205        round: Option<u32>,
1206        #[debug(skip)]
1207        callback: Sender<Option<u32>>,
1208    },
1209}