linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15        Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19    ownership::ChainOwnership,
20    time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27    execution::UserAction,
28    runtime::ContractSyncRuntime,
29    system::{CreateApplicationResult, OpenChainConfig},
30    util::{OracleResponseExt as _, RespondExt as _},
31    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeConfig,
32    ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, MessageKind, ModuleId,
33    Operation, OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext,
34    QueryOutcome, ResourceController, SystemMessage, TransactionTracker, UserContractCode,
35    UserServiceCode,
36};
37
38/// Actor for handling requests to the execution state.
39pub struct ExecutionStateActor<'a, C> {
40    state: &'a mut ExecutionStateView<C>,
41    txn_tracker: &'a mut TransactionTracker,
42    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
43}
44
45#[cfg(with_metrics)]
46mod metrics {
47    use std::sync::LazyLock;
48
49    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
50    use prometheus::HistogramVec;
51
52    /// Histogram of the latency to load a contract bytecode.
53    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
54        register_histogram_vec(
55            "load_contract_latency",
56            "Load contract latency",
57            &[],
58            exponential_bucket_latencies(250.0),
59        )
60    });
61
62    /// Histogram of the latency to load a service bytecode.
63    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
64        register_histogram_vec(
65            "load_service_latency",
66            "Load service latency",
67            &[],
68            exponential_bucket_latencies(250.0),
69        )
70    });
71}
72
73pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
74
75impl<'a, C> ExecutionStateActor<'a, C>
76where
77    C: Context + Clone + Send + Sync + 'static,
78    C::Extra: ExecutionRuntimeContext,
79{
80    /// Creates a new execution state actor.
81    pub fn new(
82        state: &'a mut ExecutionStateView<C>,
83        txn_tracker: &'a mut TransactionTracker,
84        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
85    ) -> Self {
86        Self {
87            state,
88            txn_tracker,
89            resource_controller,
90        }
91    }
92
93    pub(crate) async fn load_contract(
94        &mut self,
95        id: ApplicationId,
96    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
97        #[cfg(with_metrics)]
98        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
99        let blob_id = id.description_blob_id();
100        let description = match self.txn_tracker.get_blob_content(&blob_id) {
101            Some(blob) => bcs::from_bytes(blob.bytes())?,
102            None => {
103                self.state
104                    .system
105                    .describe_application(id, self.txn_tracker)
106                    .await?
107            }
108        };
109        let code = self
110            .state
111            .context()
112            .extra()
113            .get_user_contract(&description, self.txn_tracker)
114            .await?;
115        Ok((code, description))
116    }
117
118    pub(crate) async fn load_service(
119        &mut self,
120        id: ApplicationId,
121    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
122        #[cfg(with_metrics)]
123        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
124        let blob_id = id.description_blob_id();
125        let description = match self.txn_tracker.get_blob_content(&blob_id) {
126            Some(blob) => bcs::from_bytes(blob.bytes())?,
127            None => {
128                self.state
129                    .system
130                    .describe_application(id, self.txn_tracker)
131                    .await?
132            }
133        };
134        let code = self
135            .state
136            .context()
137            .extra()
138            .get_user_service(&description, self.txn_tracker)
139            .await?;
140        Ok((code, description))
141    }
142
143    // TODO(#1416): Support concurrent I/O.
144    pub(crate) async fn handle_request(
145        &mut self,
146        request: ExecutionRequest,
147    ) -> Result<(), ExecutionError> {
148        use ExecutionRequest::*;
149        match request {
150            #[cfg(not(web))]
151            LoadContract { id, callback } => {
152                let (code, description) = self.load_contract(id).await?;
153                callback.respond((code, description))
154            }
155            #[cfg(not(web))]
156            LoadService { id, callback } => {
157                let (code, description) = self.load_service(id).await?;
158                callback.respond((code, description))
159            }
160
161            ChainBalance { callback } => {
162                let balance = *self.state.system.balance.get();
163                callback.respond(balance);
164            }
165
166            OwnerBalance { owner, callback } => {
167                let balance = self
168                    .state
169                    .system
170                    .balances
171                    .get(&owner)
172                    .await?
173                    .unwrap_or_default();
174                callback.respond(balance);
175            }
176
177            OwnerBalances { callback } => {
178                callback.respond(self.state.system.balances.index_values().await?);
179            }
180
181            BalanceOwners { callback } => {
182                let owners = self.state.system.balances.indices().await?;
183                callback.respond(owners);
184            }
185
186            Transfer {
187                source,
188                destination,
189                amount,
190                signer,
191                application_id,
192                callback,
193            } => {
194                let maybe_message = self
195                    .state
196                    .system
197                    .transfer(signer, Some(application_id), source, destination, amount)
198                    .await?;
199                self.txn_tracker.add_outgoing_messages(maybe_message);
200                callback.respond(());
201            }
202
203            Claim {
204                source,
205                destination,
206                amount,
207                signer,
208                application_id,
209                callback,
210            } => {
211                let maybe_message = self
212                    .state
213                    .system
214                    .claim(
215                        signer,
216                        Some(application_id),
217                        source.owner,
218                        source.chain_id,
219                        destination,
220                        amount,
221                    )
222                    .await?;
223                self.txn_tracker.add_outgoing_messages(maybe_message);
224                callback.respond(());
225            }
226
227            SystemTimestamp { callback } => {
228                let timestamp = *self.state.system.timestamp.get();
229                callback.respond(timestamp);
230            }
231
232            ChainOwnership { callback } => {
233                let ownership = self.state.system.ownership.get().clone();
234                callback.respond(ownership);
235            }
236
237            ContainsKey { id, key, callback } => {
238                let view = self.state.users.try_load_entry(&id).await?;
239                let result = match view {
240                    Some(view) => view.contains_key(&key).await?,
241                    None => false,
242                };
243                callback.respond(result);
244            }
245
246            ContainsKeys { id, keys, callback } => {
247                let view = self.state.users.try_load_entry(&id).await?;
248                let result = match view {
249                    Some(view) => view.contains_keys(keys).await?,
250                    None => vec![false; keys.len()],
251                };
252                callback.respond(result);
253            }
254
255            ReadMultiValuesBytes { id, keys, callback } => {
256                let view = self.state.users.try_load_entry(&id).await?;
257                let values = match view {
258                    Some(view) => view.multi_get(keys).await?,
259                    None => vec![None; keys.len()],
260                };
261                callback.respond(values);
262            }
263
264            ReadValueBytes { id, key, callback } => {
265                let view = self.state.users.try_load_entry(&id).await?;
266                let result = match view {
267                    Some(view) => view.get(&key).await?,
268                    None => None,
269                };
270                callback.respond(result);
271            }
272
273            FindKeysByPrefix {
274                id,
275                key_prefix,
276                callback,
277            } => {
278                let view = self.state.users.try_load_entry(&id).await?;
279                let result = match view {
280                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
281                    None => Vec::new(),
282                };
283                callback.respond(result);
284            }
285
286            FindKeyValuesByPrefix {
287                id,
288                key_prefix,
289                callback,
290            } => {
291                let view = self.state.users.try_load_entry(&id).await?;
292                let result = match view {
293                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
294                    None => Vec::new(),
295                };
296                callback.respond(result);
297            }
298
299            WriteBatch {
300                id,
301                batch,
302                callback,
303            } => {
304                let mut view = self.state.users.try_load_entry_mut(&id).await?;
305                view.write_batch(batch).await?;
306                callback.respond(());
307            }
308
309            OpenChain {
310                ownership,
311                balance,
312                parent_id,
313                block_height,
314                application_permissions,
315                timestamp,
316                callback,
317            } => {
318                let config = OpenChainConfig {
319                    ownership,
320                    balance,
321                    application_permissions,
322                };
323                let chain_id = self
324                    .state
325                    .system
326                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
327                    .await?;
328                callback.respond(chain_id);
329            }
330
331            CloseChain {
332                application_id,
333                callback,
334            } => {
335                let app_permissions = self.state.system.application_permissions.get();
336                if !app_permissions.can_close_chain(&application_id) {
337                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
338                } else {
339                    self.state.system.close_chain();
340                    callback.respond(Ok(()));
341                }
342            }
343
344            ChangeApplicationPermissions {
345                application_id,
346                application_permissions,
347                callback,
348            } => {
349                let app_permissions = self.state.system.application_permissions.get();
350                if !app_permissions.can_change_application_permissions(&application_id) {
351                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
352                } else {
353                    self.state
354                        .system
355                        .application_permissions
356                        .set(application_permissions);
357                    callback.respond(Ok(()));
358                }
359            }
360
361            CreateApplication {
362                chain_id,
363                block_height,
364                module_id,
365                parameters,
366                required_application_ids,
367                callback,
368            } => {
369                let create_application_result = self
370                    .state
371                    .system
372                    .create_application(
373                        chain_id,
374                        block_height,
375                        module_id,
376                        parameters,
377                        required_application_ids,
378                        self.txn_tracker,
379                    )
380                    .await?;
381                callback.respond(create_application_result);
382            }
383
384            PerformHttpRequest {
385                request,
386                http_responses_are_oracle_responses,
387                callback,
388            } => {
389                let system = &mut self.state.system;
390                let response = self
391                    .txn_tracker
392                    .oracle(|| async {
393                        let headers = request
394                            .headers
395                            .into_iter()
396                            .map(|http::Header { name, value }| {
397                                Ok((name.parse()?, value.try_into()?))
398                            })
399                            .collect::<Result<HeaderMap, ExecutionError>>()?;
400
401                        let url = Url::parse(&request.url)?;
402                        let host = url
403                            .host_str()
404                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
405
406                        let (_epoch, committee) = system
407                            .current_committee()
408                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
409                        let allowed_hosts = &committee.policy().http_request_allow_list;
410
411                        ensure!(
412                            allowed_hosts.contains(host),
413                            ExecutionError::UnauthorizedHttpRequest(url)
414                        );
415
416                        let request = Client::new()
417                            .request(request.method.into(), url)
418                            .body(request.body)
419                            .headers(headers);
420                        #[cfg(not(web))]
421                        let request = request.timeout(linera_base::time::Duration::from_millis(
422                            committee.policy().http_request_timeout_ms,
423                        ));
424
425                        let response = request.send().await?;
426
427                        let mut response_size_limit =
428                            committee.policy().maximum_http_response_bytes;
429
430                        if http_responses_are_oracle_responses {
431                            response_size_limit = response_size_limit
432                                .min(committee.policy().maximum_oracle_response_bytes);
433                        }
434                        Ok(OracleResponse::Http(
435                            Self::receive_http_response(response, response_size_limit).await?,
436                        ))
437                    })
438                    .await?
439                    .to_http_response()?;
440                callback.respond(response);
441            }
442
443            ReadBlobContent { blob_id, callback } => {
444                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
445                    content.clone()
446                } else {
447                    let content = self.state.system.read_blob_content(blob_id).await?;
448                    if blob_id.blob_type == BlobType::Data {
449                        self.resource_controller
450                            .with_state(&mut self.state.system)
451                            .await?
452                            .track_blob_read(content.bytes().len() as u64)?;
453                    }
454                    self.state
455                        .system
456                        .blob_used(self.txn_tracker, blob_id)
457                        .await?;
458                    content
459                };
460                callback.respond(content)
461            }
462
463            AssertBlobExists { blob_id, callback } => {
464                self.state.system.assert_blob_exists(blob_id).await?;
465                // Treating this as reading a size-0 blob for fee purposes.
466                if blob_id.blob_type == BlobType::Data {
467                    self.resource_controller
468                        .with_state(&mut self.state.system)
469                        .await?
470                        .track_blob_read(0)?;
471                }
472                let is_new = self
473                    .state
474                    .system
475                    .blob_used(self.txn_tracker, blob_id)
476                    .await?;
477                if is_new {
478                    self.txn_tracker
479                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
480                }
481                callback.respond(());
482            }
483
484            Emit {
485                stream_id,
486                value,
487                callback,
488            } => {
489                let count = self
490                    .state
491                    .system
492                    .stream_event_counts
493                    .get_mut_or_default(&stream_id)
494                    .await?;
495                let index = *count;
496                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
497                self.resource_controller
498                    .with_state(&mut self.state.system)
499                    .await?
500                    .track_event_published(&value)?;
501                self.txn_tracker.add_event(stream_id, index, value);
502                callback.respond(index)
503            }
504
505            ReadEvent { event_id, callback } => {
506                let extra = self.state.context().extra();
507                let event = self
508                    .txn_tracker
509                    .oracle(|| async {
510                        let event = extra
511                            .get_event(event_id.clone())
512                            .await?
513                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
514                        Ok(OracleResponse::Event(event_id.clone(), event))
515                    })
516                    .await?
517                    .to_event(&event_id)?;
518                self.resource_controller
519                    .with_state(&mut self.state.system)
520                    .await?
521                    .track_event_read(event.len() as u64)?;
522                callback.respond(event);
523            }
524
525            SubscribeToEvents {
526                chain_id,
527                stream_id,
528                subscriber_app_id,
529                callback,
530            } => {
531                let subscriptions = self
532                    .state
533                    .system
534                    .event_subscriptions
535                    .get_mut_or_default(&(chain_id, stream_id.clone()))
536                    .await?;
537                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
538                    subscriptions.next_index
539                } else {
540                    0
541                };
542                self.txn_tracker.add_stream_to_process(
543                    subscriber_app_id,
544                    chain_id,
545                    stream_id,
546                    0,
547                    next_index,
548                );
549                callback.respond(());
550            }
551
552            UnsubscribeFromEvents {
553                chain_id,
554                stream_id,
555                subscriber_app_id,
556                callback,
557            } => {
558                let key = (chain_id, stream_id.clone());
559                let subscriptions = self
560                    .state
561                    .system
562                    .event_subscriptions
563                    .get_mut_or_default(&key)
564                    .await?;
565                subscriptions.applications.remove(&subscriber_app_id);
566                if subscriptions.applications.is_empty() {
567                    self.state.system.event_subscriptions.remove(&key)?;
568                }
569                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
570                    self.txn_tracker
571                        .remove_stream_to_process(app_id, chain_id, stream_id);
572                }
573                callback.respond(());
574            }
575
576            GetApplicationPermissions { callback } => {
577                let app_permissions = self.state.system.application_permissions.get();
578                callback.respond(app_permissions.clone());
579            }
580
581            QueryServiceOracle {
582                deadline,
583                application_id,
584                next_block_height,
585                query,
586                callback,
587            } => {
588                let state = &mut self.state;
589                let local_time = self.txn_tracker.local_time();
590                let created_blobs = self.txn_tracker.created_blobs().clone();
591                let bytes = self
592                    .txn_tracker
593                    .oracle(|| async {
594                        let context = QueryContext {
595                            chain_id: state.context().extra().chain_id(),
596                            next_block_height,
597                            local_time,
598                        };
599                        let QueryOutcome {
600                            response,
601                            operations,
602                        } = Box::pin(state.query_user_application_with_deadline(
603                            application_id,
604                            context,
605                            query,
606                            deadline,
607                            created_blobs,
608                        ))
609                        .await?;
610                        ensure!(
611                            operations.is_empty(),
612                            ExecutionError::ServiceOracleQueryOperations(operations)
613                        );
614                        Ok(OracleResponse::Service(response))
615                    })
616                    .await?
617                    .to_service_response()?;
618                callback.respond(bytes);
619            }
620
621            AddOutgoingMessage { message, callback } => {
622                self.txn_tracker.add_outgoing_message(message);
623                callback.respond(());
624            }
625
626            SetLocalTime {
627                local_time,
628                callback,
629            } => {
630                self.txn_tracker.set_local_time(local_time);
631                callback.respond(());
632            }
633
634            AssertBefore {
635                timestamp,
636                callback,
637            } => {
638                let result = if !self
639                    .txn_tracker
640                    .replay_oracle_response(OracleResponse::Assert)?
641                {
642                    // There are no recorded oracle responses, so we check the local time.
643                    let local_time = self.txn_tracker.local_time();
644                    if local_time >= timestamp {
645                        Err(ExecutionError::AssertBefore {
646                            timestamp,
647                            local_time,
648                        })
649                    } else {
650                        Ok(())
651                    }
652                } else {
653                    Ok(())
654                };
655                callback.respond(result);
656            }
657
658            AddCreatedBlob { blob, callback } => {
659                self.txn_tracker.add_created_blob(blob);
660                callback.respond(());
661            }
662
663            ValidationRound { round, callback } => {
664                let validation_round = self
665                    .txn_tracker
666                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
667                    .await?
668                    .to_round()?;
669                callback.respond(validation_round);
670            }
671        }
672
673        Ok(())
674    }
675
676    /// Calls `process_streams` for all applications that are subscribed to streams with new
677    /// events or that have new subscriptions.
678    async fn process_subscriptions(
679        &mut self,
680        context: ProcessStreamsContext,
681    ) -> Result<(), ExecutionError> {
682        // Keep track of which streams we have already processed. This is to guard against
683        // applications unsubscribing and subscribing in the process_streams call itself.
684        let mut processed = BTreeSet::new();
685        loop {
686            let to_process = self
687                .txn_tracker
688                .take_streams_to_process()
689                .into_iter()
690                .filter_map(|(app_id, updates)| {
691                    let updates = updates
692                        .into_iter()
693                        .filter_map(|update| {
694                            if !processed.insert((
695                                app_id,
696                                update.chain_id,
697                                update.stream_id.clone(),
698                            )) {
699                                return None;
700                            }
701                            Some(update)
702                        })
703                        .collect::<Vec<_>>();
704                    if updates.is_empty() {
705                        return None;
706                    }
707                    Some((app_id, updates))
708                })
709                .collect::<BTreeMap<_, _>>();
710            if to_process.is_empty() {
711                return Ok(());
712            }
713            for (app_id, updates) in to_process {
714                self.run_user_action(
715                    app_id,
716                    UserAction::ProcessStreams(context, updates),
717                    None,
718                    None,
719                )
720                .await?;
721            }
722        }
723    }
724
725    pub(crate) async fn run_user_action(
726        &mut self,
727        application_id: ApplicationId,
728        action: UserAction,
729        refund_grant_to: Option<Account>,
730        grant: Option<&mut Amount>,
731    ) -> Result<(), ExecutionError> {
732        let ExecutionRuntimeConfig {} = self.state.context().extra().execution_runtime_config();
733        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
734            .await
735    }
736
737    async fn run_user_action_with_runtime(
738        &mut self,
739        application_id: ApplicationId,
740        action: UserAction,
741        refund_grant_to: Option<Account>,
742        grant: Option<&mut Amount>,
743    ) -> Result<(), ExecutionError> {
744        let chain_id = self.state.context().extra().chain_id();
745        let mut cloned_grant = grant.as_ref().map(|x| **x);
746        let initial_balance = self
747            .resource_controller
748            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
749            .await?
750            .balance()?;
751        let controller = ResourceController::new(
752            self.resource_controller.policy().clone(),
753            self.resource_controller.tracker,
754            initial_balance,
755        );
756        let (execution_state_sender, mut execution_state_receiver) =
757            futures::channel::mpsc::unbounded();
758
759        let (code, description) = self.load_contract(application_id).await?;
760
761        let contract_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
762            let runtime = ContractSyncRuntime::new(
763                execution_state_sender,
764                chain_id,
765                refund_grant_to,
766                controller,
767                &action,
768            );
769
770            async move {
771                let code = codes.next().await.expect("we send this immediately below");
772                runtime.preload_contract(application_id, code, description)?;
773                runtime.run_action(application_id, chain_id, action)
774            }
775        })
776        .await;
777
778        contract_runtime_task.send(code)?;
779
780        while let Some(request) = execution_state_receiver.next().await {
781            self.handle_request(request).await?;
782        }
783
784        let (result, controller) = contract_runtime_task.join().await?;
785
786        self.txn_tracker.add_operation_result(result);
787
788        self.resource_controller
789            .with_state_and_grant(&mut self.state.system, grant)
790            .await?
791            .merge_balance(initial_balance, controller.balance()?)?;
792        self.resource_controller.tracker = controller.tracker;
793
794        Ok(())
795    }
796
797    pub async fn execute_operation(
798        &mut self,
799        context: OperationContext,
800        operation: Operation,
801    ) -> Result<(), ExecutionError> {
802        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
803        match operation {
804            Operation::System(op) => {
805                let new_application = self
806                    .state
807                    .system
808                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
809                    .await?;
810                if let Some((application_id, argument)) = new_application {
811                    let user_action = UserAction::Instantiate(context, argument);
812                    self.run_user_action(
813                        application_id,
814                        user_action,
815                        context.refund_grant_to(),
816                        None,
817                    )
818                    .await?;
819                }
820            }
821            Operation::User {
822                application_id,
823                bytes,
824            } => {
825                self.run_user_action(
826                    application_id,
827                    UserAction::Operation(context, bytes),
828                    context.refund_grant_to(),
829                    None,
830                )
831                .await?;
832            }
833        }
834        self.process_subscriptions(context.into()).await?;
835        Ok(())
836    }
837
838    pub async fn execute_message(
839        &mut self,
840        context: MessageContext,
841        message: Message,
842        grant: Option<&mut Amount>,
843    ) -> Result<(), ExecutionError> {
844        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
845        match message {
846            Message::System(message) => {
847                let outcome = self.state.system.execute_message(context, message).await?;
848                self.txn_tracker.add_outgoing_messages(outcome);
849            }
850            Message::User {
851                application_id,
852                bytes,
853            } => {
854                self.run_user_action(
855                    application_id,
856                    UserAction::Message(context, bytes),
857                    context.refund_grant_to,
858                    grant,
859                )
860                .await?;
861            }
862        }
863        self.process_subscriptions(context.into()).await?;
864        Ok(())
865    }
866
867    pub fn bounce_message(
868        &mut self,
869        context: MessageContext,
870        grant: Amount,
871        message: Message,
872    ) -> Result<(), ExecutionError> {
873        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
874        self.txn_tracker.add_outgoing_message(OutgoingMessage {
875            destination: context.origin,
876            authenticated_owner: context.authenticated_owner,
877            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
878            grant,
879            kind: MessageKind::Bouncing,
880            message,
881        });
882        Ok(())
883    }
884
885    pub fn send_refund(
886        &mut self,
887        context: MessageContext,
888        amount: Amount,
889    ) -> Result<(), ExecutionError> {
890        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
891        if amount.is_zero() {
892            return Ok(());
893        }
894        let Some(account) = context.refund_grant_to else {
895            return Err(ExecutionError::InternalError(
896                "Messages with grants should have a non-empty `refund_grant_to`",
897            ));
898        };
899        let message = SystemMessage::Credit {
900            amount,
901            source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
902            target: account.owner,
903        };
904        self.txn_tracker.add_outgoing_message(
905            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
906        );
907        Ok(())
908    }
909
910    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
911    ///
912    /// Ensures that the response does not exceed the provided `size_limit`.
913    async fn receive_http_response(
914        response: reqwest::Response,
915        size_limit: u64,
916    ) -> Result<http::Response, ExecutionError> {
917        let status = response.status().as_u16();
918        let maybe_content_length = response.content_length();
919
920        let headers = response
921            .headers()
922            .iter()
923            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
924            .collect::<Vec<_>>();
925
926        let total_header_size = headers
927            .iter()
928            .map(|header| (header.name.len() + header.value.len()) as u64)
929            .sum();
930
931        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
932            ExecutionError::HttpResponseSizeLimitExceeded {
933                limit: size_limit,
934                size: total_header_size,
935            },
936        )?;
937
938        if let Some(content_length) = maybe_content_length {
939            if content_length > remaining_bytes {
940                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
941                    limit: size_limit,
942                    size: content_length + total_header_size,
943                });
944            }
945        }
946
947        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
948        let mut body_stream = response.bytes_stream();
949
950        while let Some(bytes) = body_stream.next().await.transpose()? {
951            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
952                ExecutionError::HttpResponseSizeLimitExceeded {
953                    limit: size_limit,
954                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
955                },
956            )?;
957
958            body.extend(&bytes);
959        }
960
961        Ok(http::Response {
962            status,
963            headers,
964            body,
965        })
966    }
967}
968
969/// Requests to the execution state.
970#[derive(Debug)]
971pub enum ExecutionRequest {
972    #[cfg(not(web))]
973    LoadContract {
974        id: ApplicationId,
975        #[debug(skip)]
976        callback: Sender<(UserContractCode, ApplicationDescription)>,
977    },
978
979    #[cfg(not(web))]
980    LoadService {
981        id: ApplicationId,
982        #[debug(skip)]
983        callback: Sender<(UserServiceCode, ApplicationDescription)>,
984    },
985
986    ChainBalance {
987        #[debug(skip)]
988        callback: Sender<Amount>,
989    },
990
991    OwnerBalance {
992        owner: AccountOwner,
993        #[debug(skip)]
994        callback: Sender<Amount>,
995    },
996
997    OwnerBalances {
998        #[debug(skip)]
999        callback: Sender<Vec<(AccountOwner, Amount)>>,
1000    },
1001
1002    BalanceOwners {
1003        #[debug(skip)]
1004        callback: Sender<Vec<AccountOwner>>,
1005    },
1006
1007    Transfer {
1008        source: AccountOwner,
1009        destination: Account,
1010        amount: Amount,
1011        #[debug(skip_if = Option::is_none)]
1012        signer: Option<AccountOwner>,
1013        application_id: ApplicationId,
1014        #[debug(skip)]
1015        callback: Sender<()>,
1016    },
1017
1018    Claim {
1019        source: Account,
1020        destination: Account,
1021        amount: Amount,
1022        #[debug(skip_if = Option::is_none)]
1023        signer: Option<AccountOwner>,
1024        application_id: ApplicationId,
1025        #[debug(skip)]
1026        callback: Sender<()>,
1027    },
1028
1029    SystemTimestamp {
1030        #[debug(skip)]
1031        callback: Sender<Timestamp>,
1032    },
1033
1034    ChainOwnership {
1035        #[debug(skip)]
1036        callback: Sender<ChainOwnership>,
1037    },
1038
1039    ReadValueBytes {
1040        id: ApplicationId,
1041        #[debug(with = hex_debug)]
1042        key: Vec<u8>,
1043        #[debug(skip)]
1044        callback: Sender<Option<Vec<u8>>>,
1045    },
1046
1047    ContainsKey {
1048        id: ApplicationId,
1049        key: Vec<u8>,
1050        #[debug(skip)]
1051        callback: Sender<bool>,
1052    },
1053
1054    ContainsKeys {
1055        id: ApplicationId,
1056        #[debug(with = hex_vec_debug)]
1057        keys: Vec<Vec<u8>>,
1058        callback: Sender<Vec<bool>>,
1059    },
1060
1061    ReadMultiValuesBytes {
1062        id: ApplicationId,
1063        #[debug(with = hex_vec_debug)]
1064        keys: Vec<Vec<u8>>,
1065        #[debug(skip)]
1066        callback: Sender<Vec<Option<Vec<u8>>>>,
1067    },
1068
1069    FindKeysByPrefix {
1070        id: ApplicationId,
1071        #[debug(with = hex_debug)]
1072        key_prefix: Vec<u8>,
1073        #[debug(skip)]
1074        callback: Sender<Vec<Vec<u8>>>,
1075    },
1076
1077    FindKeyValuesByPrefix {
1078        id: ApplicationId,
1079        #[debug(with = hex_debug)]
1080        key_prefix: Vec<u8>,
1081        #[debug(skip)]
1082        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1083    },
1084
1085    WriteBatch {
1086        id: ApplicationId,
1087        batch: Batch,
1088        #[debug(skip)]
1089        callback: Sender<()>,
1090    },
1091
1092    OpenChain {
1093        ownership: ChainOwnership,
1094        #[debug(skip_if = Amount::is_zero)]
1095        balance: Amount,
1096        parent_id: ChainId,
1097        block_height: BlockHeight,
1098        application_permissions: ApplicationPermissions,
1099        timestamp: Timestamp,
1100        #[debug(skip)]
1101        callback: Sender<ChainId>,
1102    },
1103
1104    CloseChain {
1105        application_id: ApplicationId,
1106        #[debug(skip)]
1107        callback: Sender<Result<(), ExecutionError>>,
1108    },
1109
1110    ChangeApplicationPermissions {
1111        application_id: ApplicationId,
1112        application_permissions: ApplicationPermissions,
1113        #[debug(skip)]
1114        callback: Sender<Result<(), ExecutionError>>,
1115    },
1116
1117    CreateApplication {
1118        chain_id: ChainId,
1119        block_height: BlockHeight,
1120        module_id: ModuleId,
1121        parameters: Vec<u8>,
1122        required_application_ids: Vec<ApplicationId>,
1123        #[debug(skip)]
1124        callback: Sender<CreateApplicationResult>,
1125    },
1126
1127    PerformHttpRequest {
1128        request: http::Request,
1129        http_responses_are_oracle_responses: bool,
1130        #[debug(skip)]
1131        callback: Sender<http::Response>,
1132    },
1133
1134    ReadBlobContent {
1135        blob_id: BlobId,
1136        #[debug(skip)]
1137        callback: Sender<BlobContent>,
1138    },
1139
1140    AssertBlobExists {
1141        blob_id: BlobId,
1142        #[debug(skip)]
1143        callback: Sender<()>,
1144    },
1145
1146    Emit {
1147        stream_id: StreamId,
1148        #[debug(with = hex_debug)]
1149        value: Vec<u8>,
1150        #[debug(skip)]
1151        callback: Sender<u32>,
1152    },
1153
1154    ReadEvent {
1155        event_id: EventId,
1156        callback: oneshot::Sender<Vec<u8>>,
1157    },
1158
1159    SubscribeToEvents {
1160        chain_id: ChainId,
1161        stream_id: StreamId,
1162        subscriber_app_id: ApplicationId,
1163        #[debug(skip)]
1164        callback: Sender<()>,
1165    },
1166
1167    UnsubscribeFromEvents {
1168        chain_id: ChainId,
1169        stream_id: StreamId,
1170        subscriber_app_id: ApplicationId,
1171        #[debug(skip)]
1172        callback: Sender<()>,
1173    },
1174
1175    GetApplicationPermissions {
1176        #[debug(skip)]
1177        callback: Sender<ApplicationPermissions>,
1178    },
1179
1180    QueryServiceOracle {
1181        deadline: Option<Instant>,
1182        application_id: ApplicationId,
1183        next_block_height: BlockHeight,
1184        query: Vec<u8>,
1185        #[debug(skip)]
1186        callback: Sender<Vec<u8>>,
1187    },
1188
1189    AddOutgoingMessage {
1190        message: crate::OutgoingMessage,
1191        #[debug(skip)]
1192        callback: Sender<()>,
1193    },
1194
1195    SetLocalTime {
1196        local_time: Timestamp,
1197        #[debug(skip)]
1198        callback: Sender<()>,
1199    },
1200
1201    AssertBefore {
1202        timestamp: Timestamp,
1203        #[debug(skip)]
1204        callback: Sender<Result<(), ExecutionError>>,
1205    },
1206
1207    AddCreatedBlob {
1208        blob: crate::Blob,
1209        #[debug(skip)]
1210        callback: Sender<()>,
1211    },
1212
1213    ValidationRound {
1214        round: Option<u32>,
1215        #[debug(skip)]
1216        callback: Sender<Option<u32>>,
1217    },
1218}