linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15        Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19    ownership::ChainOwnership,
20    time::Instant,
21};
22use linera_views::{batch::Batch, context::Context, views::View};
23use oneshot::Sender;
24use reqwest::{header::HeaderMap, Client, Url};
25
26use crate::{
27    execution::UserAction,
28    runtime::ContractSyncRuntime,
29    system::{CreateApplicationResult, OpenChainConfig},
30    util::{OracleResponseExt as _, RespondExt as _},
31    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
32    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
33    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
34    ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
35};
36
37/// Actor for handling requests to the execution state.
38pub struct ExecutionStateActor<'a, C> {
39    state: &'a mut ExecutionStateView<C>,
40    txn_tracker: &'a mut TransactionTracker,
41    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
42}
43
44#[cfg(with_metrics)]
45mod metrics {
46    use std::sync::LazyLock;
47
48    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
49    use prometheus::HistogramVec;
50
51    /// Histogram of the latency to load a contract bytecode.
52    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
53        register_histogram_vec(
54            "load_contract_latency",
55            "Load contract latency",
56            &[],
57            exponential_bucket_latencies(250.0),
58        )
59    });
60
61    /// Histogram of the latency to load a service bytecode.
62    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
63        register_histogram_vec(
64            "load_service_latency",
65            "Load service latency",
66            &[],
67            exponential_bucket_latencies(250.0),
68        )
69    });
70}
71
72pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
73
74impl<'a, C> ExecutionStateActor<'a, C>
75where
76    C: Context + Clone + 'static,
77    C::Extra: ExecutionRuntimeContext,
78{
79    /// Creates a new execution state actor.
80    pub fn new(
81        state: &'a mut ExecutionStateView<C>,
82        txn_tracker: &'a mut TransactionTracker,
83        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
84    ) -> Self {
85        Self {
86            state,
87            txn_tracker,
88            resource_controller,
89        }
90    }
91
92    pub(crate) async fn load_contract(
93        &mut self,
94        id: ApplicationId,
95    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
96        #[cfg(with_metrics)]
97        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
98        let blob_id = id.description_blob_id();
99        let description = match self.txn_tracker.get_blob_content(&blob_id) {
100            Some(blob) => bcs::from_bytes(blob.bytes())?,
101            None => {
102                self.state
103                    .system
104                    .describe_application(id, self.txn_tracker)
105                    .await?
106            }
107        };
108        let code = self
109            .state
110            .context()
111            .extra()
112            .get_user_contract(&description, self.txn_tracker)
113            .await?;
114        Ok((code, description))
115    }
116
117    pub(crate) async fn load_service(
118        &mut self,
119        id: ApplicationId,
120    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
121        #[cfg(with_metrics)]
122        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
123        let blob_id = id.description_blob_id();
124        let description = match self.txn_tracker.get_blob_content(&blob_id) {
125            Some(blob) => bcs::from_bytes(blob.bytes())?,
126            None => {
127                self.state
128                    .system
129                    .describe_application(id, self.txn_tracker)
130                    .await?
131            }
132        };
133        let code = self
134            .state
135            .context()
136            .extra()
137            .get_user_service(&description, self.txn_tracker)
138            .await?;
139        Ok((code, description))
140    }
141
142    // TODO(#1416): Support concurrent I/O.
143    pub(crate) async fn handle_request(
144        &mut self,
145        request: ExecutionRequest,
146    ) -> Result<(), ExecutionError> {
147        use ExecutionRequest::*;
148        match request {
149            #[cfg(not(web))]
150            LoadContract { id, callback } => {
151                let (code, description) = self.load_contract(id).await?;
152                callback.respond((code, description))
153            }
154            #[cfg(not(web))]
155            LoadService { id, callback } => {
156                let (code, description) = self.load_service(id).await?;
157                callback.respond((code, description))
158            }
159
160            ChainBalance { callback } => {
161                let balance = *self.state.system.balance.get();
162                callback.respond(balance);
163            }
164
165            OwnerBalance { owner, callback } => {
166                let balance = self
167                    .state
168                    .system
169                    .balances
170                    .get(&owner)
171                    .await?
172                    .unwrap_or_default();
173                callback.respond(balance);
174            }
175
176            OwnerBalances { callback } => {
177                callback.respond(self.state.system.balances.index_values().await?);
178            }
179
180            BalanceOwners { callback } => {
181                let owners = self.state.system.balances.indices().await?;
182                callback.respond(owners);
183            }
184
185            Transfer {
186                source,
187                destination,
188                amount,
189                signer,
190                application_id,
191                callback,
192            } => {
193                let maybe_message = self
194                    .state
195                    .system
196                    .transfer(signer, Some(application_id), source, destination, amount)
197                    .await?;
198                self.txn_tracker.add_outgoing_messages(maybe_message);
199                callback.respond(());
200            }
201
202            Claim {
203                source,
204                destination,
205                amount,
206                signer,
207                application_id,
208                callback,
209            } => {
210                let maybe_message = self
211                    .state
212                    .system
213                    .claim(
214                        signer,
215                        Some(application_id),
216                        source.owner,
217                        source.chain_id,
218                        destination,
219                        amount,
220                    )
221                    .await?;
222                self.txn_tracker.add_outgoing_messages(maybe_message);
223                callback.respond(());
224            }
225
226            SystemTimestamp { callback } => {
227                let timestamp = *self.state.system.timestamp.get();
228                callback.respond(timestamp);
229            }
230
231            ChainOwnership { callback } => {
232                let ownership = self.state.system.ownership.get().clone();
233                callback.respond(ownership);
234            }
235
236            ApplicationPermissions { callback } => {
237                let permissions = self.state.system.application_permissions.get().clone();
238                callback.respond(permissions);
239            }
240
241            ReadApplicationDescription {
242                application_id,
243                callback,
244            } => {
245                let blob_id = application_id.description_blob_id();
246                let description = match self.txn_tracker.get_blob_content(&blob_id) {
247                    Some(blob) => bcs::from_bytes(blob.bytes())?,
248                    None => {
249                        let blob_content = self.state.system.read_blob_content(blob_id).await?;
250                        self.state
251                            .system
252                            .blob_used(self.txn_tracker, blob_id)
253                            .await?;
254                        bcs::from_bytes(blob_content.bytes())?
255                    }
256                };
257                callback.respond(description);
258            }
259
260            ContainsKey { id, key, callback } => {
261                let view = self.state.users.try_load_entry(&id).await?;
262                let result = match view {
263                    Some(view) => view.contains_key(&key).await?,
264                    None => false,
265                };
266                callback.respond(result);
267            }
268
269            ContainsKeys { id, keys, callback } => {
270                let view = self.state.users.try_load_entry(&id).await?;
271                let result = match view {
272                    Some(view) => view.contains_keys(&keys).await?,
273                    None => vec![false; keys.len()],
274                };
275                callback.respond(result);
276            }
277
278            ReadMultiValuesBytes { id, keys, callback } => {
279                let view = self.state.users.try_load_entry(&id).await?;
280                let values = match view {
281                    Some(view) => view.multi_get(&keys).await?,
282                    None => vec![None; keys.len()],
283                };
284                callback.respond(values);
285            }
286
287            ReadValueBytes { id, key, callback } => {
288                let view = self.state.users.try_load_entry(&id).await?;
289                let result = match view {
290                    Some(view) => view.get(&key).await?,
291                    None => None,
292                };
293                callback.respond(result);
294            }
295
296            FindKeysByPrefix {
297                id,
298                key_prefix,
299                callback,
300            } => {
301                let view = self.state.users.try_load_entry(&id).await?;
302                let result = match view {
303                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
304                    None => Vec::new(),
305                };
306                callback.respond(result);
307            }
308
309            FindKeyValuesByPrefix {
310                id,
311                key_prefix,
312                callback,
313            } => {
314                let view = self.state.users.try_load_entry(&id).await?;
315                let result = match view {
316                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
317                    None => Vec::new(),
318                };
319                callback.respond(result);
320            }
321
322            WriteBatch {
323                id,
324                batch,
325                callback,
326            } => {
327                let mut view = self.state.users.try_load_entry_mut(&id).await?;
328                view.write_batch(batch).await?;
329                callback.respond(());
330            }
331
332            OpenChain {
333                ownership,
334                balance,
335                parent_id,
336                block_height,
337                application_permissions,
338                timestamp,
339                callback,
340            } => {
341                let config = OpenChainConfig {
342                    ownership,
343                    balance,
344                    application_permissions,
345                };
346                let chain_id = self
347                    .state
348                    .system
349                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
350                    .await?;
351                callback.respond(chain_id);
352            }
353
354            CloseChain {
355                application_id,
356                callback,
357            } => {
358                let app_permissions = self.state.system.application_permissions.get();
359                if !app_permissions.can_manage_chain(&application_id) {
360                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
361                } else {
362                    self.state.system.close_chain();
363                    callback.respond(Ok(()));
364                }
365            }
366
367            ChangeOwnership {
368                application_id,
369                ownership,
370                callback,
371            } => {
372                let app_permissions = self.state.system.application_permissions.get();
373                if !app_permissions.can_manage_chain(&application_id) {
374                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
375                } else {
376                    self.state.system.ownership.set(ownership);
377                    callback.respond(Ok(()));
378                }
379            }
380
381            ChangeApplicationPermissions {
382                application_id,
383                application_permissions,
384                callback,
385            } => {
386                let app_permissions = self.state.system.application_permissions.get();
387                if !app_permissions.can_manage_chain(&application_id) {
388                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
389                } else {
390                    self.state
391                        .system
392                        .application_permissions
393                        .set(application_permissions);
394                    callback.respond(Ok(()));
395                }
396            }
397
398            PeekApplicationIndex { callback } => {
399                let index = self.txn_tracker.peek_application_index();
400                callback.respond(index)
401            }
402
403            CreateApplication {
404                chain_id,
405                block_height,
406                module_id,
407                parameters,
408                required_application_ids,
409                callback,
410            } => {
411                let create_application_result = self
412                    .state
413                    .system
414                    .create_application(
415                        chain_id,
416                        block_height,
417                        module_id,
418                        parameters,
419                        required_application_ids,
420                        self.txn_tracker,
421                    )
422                    .await?;
423                callback.respond(create_application_result);
424            }
425
426            PerformHttpRequest {
427                request,
428                http_responses_are_oracle_responses,
429                callback,
430            } => {
431                let system = &mut self.state.system;
432                let response = self
433                    .txn_tracker
434                    .oracle(|| async {
435                        let headers = request
436                            .headers
437                            .into_iter()
438                            .map(|http::Header { name, value }| {
439                                Ok((name.parse()?, value.try_into()?))
440                            })
441                            .collect::<Result<HeaderMap, ExecutionError>>()?;
442
443                        let url = Url::parse(&request.url)?;
444                        let host = url
445                            .host_str()
446                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
447
448                        let (_epoch, committee) = system
449                            .current_committee()
450                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
451                        let allowed_hosts = &committee.policy().http_request_allow_list;
452
453                        ensure!(
454                            allowed_hosts.contains(host),
455                            ExecutionError::UnauthorizedHttpRequest(url)
456                        );
457
458                        let request = Client::new()
459                            .request(request.method.into(), url)
460                            .body(request.body)
461                            .headers(headers);
462                        #[cfg(not(web))]
463                        let request = request.timeout(linera_base::time::Duration::from_millis(
464                            committee.policy().http_request_timeout_ms,
465                        ));
466
467                        let response = request.send().await?;
468
469                        let mut response_size_limit =
470                            committee.policy().maximum_http_response_bytes;
471
472                        if http_responses_are_oracle_responses {
473                            response_size_limit = response_size_limit
474                                .min(committee.policy().maximum_oracle_response_bytes);
475                        }
476                        Ok(OracleResponse::Http(
477                            Self::receive_http_response(response, response_size_limit).await?,
478                        ))
479                    })
480                    .await?
481                    .to_http_response()?;
482                callback.respond(response);
483            }
484
485            ReadBlobContent { blob_id, callback } => {
486                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
487                    content.clone()
488                } else {
489                    let content = self.state.system.read_blob_content(blob_id).await?;
490                    if blob_id.blob_type == BlobType::Data {
491                        self.resource_controller
492                            .with_state(&mut self.state.system)
493                            .await?
494                            .track_blob_read(content.bytes().len() as u64)?;
495                    }
496                    self.state
497                        .system
498                        .blob_used(self.txn_tracker, blob_id)
499                        .await?;
500                    content
501                };
502                callback.respond(content)
503            }
504
505            AssertBlobExists { blob_id, callback } => {
506                self.state.system.assert_blob_exists(blob_id).await?;
507                // Treating this as reading a size-0 blob for fee purposes.
508                if blob_id.blob_type == BlobType::Data {
509                    self.resource_controller
510                        .with_state(&mut self.state.system)
511                        .await?
512                        .track_blob_read(0)?;
513                }
514                let is_new = self
515                    .state
516                    .system
517                    .blob_used(self.txn_tracker, blob_id)
518                    .await?;
519                if is_new {
520                    self.txn_tracker
521                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
522                }
523                callback.respond(());
524            }
525
526            Emit {
527                stream_id,
528                value,
529                callback,
530            } => {
531                let count = self
532                    .state
533                    .system
534                    .stream_event_counts
535                    .get_mut_or_default(&stream_id)
536                    .await?;
537                let index = *count;
538                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
539                self.resource_controller
540                    .with_state(&mut self.state.system)
541                    .await?
542                    .track_event_published(&value)?;
543                self.txn_tracker.add_event(stream_id, index, value);
544                callback.respond(index)
545            }
546
547            ReadEvent { event_id, callback } => {
548                let context = self.state.context();
549                let extra = context.extra();
550                let event = self
551                    .txn_tracker
552                    .oracle(|| async {
553                        let event = extra
554                            .get_event(event_id.clone())
555                            .await?
556                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
557                        Ok(OracleResponse::Event(event_id.clone(), event))
558                    })
559                    .await?
560                    .to_event(&event_id)?;
561                self.resource_controller
562                    .with_state(&mut self.state.system)
563                    .await?
564                    .track_event_read(event.len() as u64)?;
565                callback.respond(event);
566            }
567
568            SubscribeToEvents {
569                chain_id,
570                stream_id,
571                subscriber_app_id,
572                callback,
573            } => {
574                let subscriptions = self
575                    .state
576                    .system
577                    .event_subscriptions
578                    .get_mut_or_default(&(chain_id, stream_id.clone()))
579                    .await?;
580                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
581                    subscriptions.next_index
582                } else {
583                    0
584                };
585                self.txn_tracker.add_stream_to_process(
586                    subscriber_app_id,
587                    chain_id,
588                    stream_id,
589                    0,
590                    next_index,
591                );
592                callback.respond(());
593            }
594
595            UnsubscribeFromEvents {
596                chain_id,
597                stream_id,
598                subscriber_app_id,
599                callback,
600            } => {
601                let key = (chain_id, stream_id.clone());
602                let subscriptions = self
603                    .state
604                    .system
605                    .event_subscriptions
606                    .get_mut_or_default(&key)
607                    .await?;
608                subscriptions.applications.remove(&subscriber_app_id);
609                if subscriptions.applications.is_empty() {
610                    self.state.system.event_subscriptions.remove(&key)?;
611                }
612                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
613                    self.txn_tracker
614                        .remove_stream_to_process(app_id, chain_id, stream_id);
615                }
616                callback.respond(());
617            }
618
619            GetApplicationPermissions { callback } => {
620                let app_permissions = self.state.system.application_permissions.get();
621                callback.respond(app_permissions.clone());
622            }
623
624            QueryServiceOracle {
625                deadline,
626                application_id,
627                next_block_height,
628                query,
629                callback,
630            } => {
631                let state = &mut self.state;
632                let local_time = self.txn_tracker.local_time();
633                let created_blobs = self.txn_tracker.created_blobs().clone();
634                let bytes = self
635                    .txn_tracker
636                    .oracle(|| async {
637                        let context = QueryContext {
638                            chain_id: state.context().extra().chain_id(),
639                            next_block_height,
640                            local_time,
641                        };
642                        let QueryOutcome {
643                            response,
644                            operations,
645                        } = Box::pin(state.query_user_application_with_deadline(
646                            application_id,
647                            context,
648                            query,
649                            deadline,
650                            created_blobs,
651                        ))
652                        .await?;
653                        ensure!(
654                            operations.is_empty(),
655                            ExecutionError::ServiceOracleQueryOperations(operations)
656                        );
657                        Ok(OracleResponse::Service(response))
658                    })
659                    .await?
660                    .to_service_response()?;
661                callback.respond(bytes);
662            }
663
664            AddOutgoingMessage { message, callback } => {
665                self.txn_tracker.add_outgoing_message(message);
666                callback.respond(());
667            }
668
669            SetLocalTime {
670                local_time,
671                callback,
672            } => {
673                self.txn_tracker.set_local_time(local_time);
674                callback.respond(());
675            }
676
677            AssertBefore {
678                timestamp,
679                callback,
680            } => {
681                let result = if !self
682                    .txn_tracker
683                    .replay_oracle_response(OracleResponse::Assert)?
684                {
685                    // There are no recorded oracle responses, so we check the local time.
686                    let local_time = self.txn_tracker.local_time();
687                    if local_time >= timestamp {
688                        Err(ExecutionError::AssertBefore {
689                            timestamp,
690                            local_time,
691                        })
692                    } else {
693                        Ok(())
694                    }
695                } else {
696                    Ok(())
697                };
698                callback.respond(result);
699            }
700
701            AddCreatedBlob { blob, callback } => {
702                self.txn_tracker.add_created_blob(blob);
703                callback.respond(());
704            }
705
706            ValidationRound { round, callback } => {
707                let validation_round = self
708                    .txn_tracker
709                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
710                    .await?
711                    .to_round()?;
712                callback.respond(validation_round);
713            }
714
715            TotalStorageSize {
716                application,
717                callback,
718            } => {
719                let view = self.state.users.try_load_entry(&application).await?;
720                let result = match view {
721                    Some(view) => {
722                        let total_size = view.total_size();
723                        (total_size.key, total_size.value)
724                    }
725                    None => (0, 0),
726                };
727                callback.respond(result);
728            }
729
730            AllowApplicationLogs { callback } => {
731                let allow = self
732                    .state
733                    .context()
734                    .extra()
735                    .execution_runtime_config()
736                    .allow_application_logs;
737                callback.respond(allow);
738            }
739
740            #[cfg(web)]
741            Log { message, level } => {
742                // Output directly to browser console with clean formatting
743                let formatted: js_sys::JsString = format!("[CONTRACT {level}] {message}").into();
744                match level {
745                    tracing::log::Level::Trace | tracing::log::Level::Debug => {
746                        web_sys::console::debug_1(&formatted)
747                    }
748                    tracing::log::Level::Info => web_sys::console::log_1(&formatted),
749                    tracing::log::Level::Warn => web_sys::console::warn_1(&formatted),
750                    tracing::log::Level::Error => web_sys::console::error_1(&formatted),
751                }
752            }
753        }
754
755        Ok(())
756    }
757
758    /// Calls `process_streams` for all applications that are subscribed to streams with new
759    /// events or that have new subscriptions.
760    async fn process_subscriptions(
761        &mut self,
762        context: ProcessStreamsContext,
763    ) -> Result<(), ExecutionError> {
764        // Keep track of which streams we have already processed. This is to guard against
765        // applications unsubscribing and subscribing in the process_streams call itself.
766        let mut processed = BTreeSet::new();
767        loop {
768            let to_process = self
769                .txn_tracker
770                .take_streams_to_process()
771                .into_iter()
772                .filter_map(|(app_id, updates)| {
773                    let updates = updates
774                        .into_iter()
775                        .filter_map(|update| {
776                            if !processed.insert((
777                                app_id,
778                                update.chain_id,
779                                update.stream_id.clone(),
780                            )) {
781                                return None;
782                            }
783                            Some(update)
784                        })
785                        .collect::<Vec<_>>();
786                    if updates.is_empty() {
787                        return None;
788                    }
789                    Some((app_id, updates))
790                })
791                .collect::<BTreeMap<_, _>>();
792            if to_process.is_empty() {
793                return Ok(());
794            }
795            for (app_id, updates) in to_process {
796                self.run_user_action(
797                    app_id,
798                    UserAction::ProcessStreams(context, updates),
799                    None,
800                    None,
801                )
802                .await?;
803            }
804        }
805    }
806
807    pub(crate) async fn run_user_action(
808        &mut self,
809        application_id: ApplicationId,
810        action: UserAction,
811        refund_grant_to: Option<Account>,
812        grant: Option<&mut Amount>,
813    ) -> Result<(), ExecutionError> {
814        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
815            .await
816    }
817
818    // TODO(#5034): unify with `contract_and_dependencies`
819    pub(crate) async fn service_and_dependencies(
820        &mut self,
821        application: ApplicationId,
822    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
823        // cyclic futures are illegal so we need to either box the frames or keep our own
824        // stack
825        let mut stack = vec![application];
826        let mut codes = vec![];
827        let mut descriptions = vec![];
828
829        while let Some(id) = stack.pop() {
830            let (code, description) = self.load_service(id).await?;
831            stack.extend(description.required_application_ids.iter().rev().copied());
832            codes.push(code);
833            descriptions.push(description);
834        }
835
836        codes.reverse();
837        descriptions.reverse();
838
839        Ok((codes, descriptions))
840    }
841
842    // TODO(#5034): unify with `service_and_dependencies`
843    async fn contract_and_dependencies(
844        &mut self,
845        application: ApplicationId,
846    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
847        // cyclic futures are illegal so we need to either box the frames or keep our own
848        // stack
849        let mut stack = vec![application];
850        let mut codes = vec![];
851        let mut descriptions = vec![];
852
853        while let Some(id) = stack.pop() {
854            let (code, description) = self.load_contract(id).await?;
855            stack.extend(description.required_application_ids.iter().rev().copied());
856            codes.push(code);
857            descriptions.push(description);
858        }
859
860        codes.reverse();
861        descriptions.reverse();
862
863        Ok((codes, descriptions))
864    }
865
866    async fn run_user_action_with_runtime(
867        &mut self,
868        application_id: ApplicationId,
869        action: UserAction,
870        refund_grant_to: Option<Account>,
871        grant: Option<&mut Amount>,
872    ) -> Result<(), ExecutionError> {
873        let chain_id = self.state.context().extra().chain_id();
874        let mut cloned_grant = grant.as_ref().map(|x| **x);
875        let initial_balance = self
876            .resource_controller
877            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
878            .await?
879            .balance()?;
880        let controller = ResourceController::new(
881            self.resource_controller.policy().clone(),
882            self.resource_controller.tracker,
883            initial_balance,
884        );
885        let (execution_state_sender, mut execution_state_receiver) =
886            futures::channel::mpsc::unbounded();
887
888        let (codes, descriptions): (Vec<_>, Vec<_>) =
889            self.contract_and_dependencies(application_id).await?;
890
891        let allow_application_logs = self
892            .state
893            .context()
894            .extra()
895            .execution_runtime_config()
896            .allow_application_logs;
897
898        let contract_runtime_task = self
899            .state
900            .context()
901            .extra()
902            .thread_pool()
903            .run_send(JsVec(codes), move |codes| async move {
904                let runtime = ContractSyncRuntime::new(
905                    execution_state_sender,
906                    chain_id,
907                    refund_grant_to,
908                    controller,
909                    &action,
910                    allow_application_logs,
911                );
912
913                for (code, description) in codes.0.into_iter().zip(descriptions) {
914                    runtime.preload_contract(
915                        ApplicationId::from(&description),
916                        code,
917                        description,
918                    )?;
919                }
920
921                runtime.run_action(application_id, chain_id, action)
922            })
923            .await;
924
925        while let Some(request) = execution_state_receiver.next().await {
926            self.handle_request(request).await?;
927        }
928
929        let (result, controller) = contract_runtime_task.await??;
930
931        self.txn_tracker.add_operation_result(result);
932
933        self.resource_controller
934            .with_state_and_grant(&mut self.state.system, grant)
935            .await?
936            .merge_balance(initial_balance, controller.balance()?)?;
937        self.resource_controller.tracker = controller.tracker;
938
939        Ok(())
940    }
941
942    pub async fn execute_operation(
943        &mut self,
944        context: OperationContext,
945        operation: Operation,
946    ) -> Result<(), ExecutionError> {
947        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
948        match operation {
949            Operation::System(op) => {
950                let new_application = self
951                    .state
952                    .system
953                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
954                    .await?;
955                if let Some((application_id, argument)) = new_application {
956                    let user_action = UserAction::Instantiate(context, argument);
957                    self.run_user_action(
958                        application_id,
959                        user_action,
960                        context.refund_grant_to(),
961                        None,
962                    )
963                    .await?;
964                }
965            }
966            Operation::User {
967                application_id,
968                bytes,
969            } => {
970                self.run_user_action(
971                    application_id,
972                    UserAction::Operation(context, bytes),
973                    context.refund_grant_to(),
974                    None,
975                )
976                .await?;
977            }
978        }
979        self.process_subscriptions(context.into()).await?;
980        Ok(())
981    }
982
983    pub async fn execute_message(
984        &mut self,
985        context: MessageContext,
986        message: Message,
987        grant: Option<&mut Amount>,
988    ) -> Result<(), ExecutionError> {
989        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
990        match message {
991            Message::System(message) => {
992                let outcome = self.state.system.execute_message(context, message).await?;
993                self.txn_tracker.add_outgoing_messages(outcome);
994            }
995            Message::User {
996                application_id,
997                bytes,
998            } => {
999                self.run_user_action(
1000                    application_id,
1001                    UserAction::Message(context, bytes),
1002                    context.refund_grant_to,
1003                    grant,
1004                )
1005                .await?;
1006            }
1007        }
1008        self.process_subscriptions(context.into()).await?;
1009        Ok(())
1010    }
1011
1012    pub fn bounce_message(
1013        &mut self,
1014        context: MessageContext,
1015        grant: Amount,
1016        message: Message,
1017    ) -> Result<(), ExecutionError> {
1018        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1019        self.txn_tracker.add_outgoing_message(OutgoingMessage {
1020            destination: context.origin,
1021            authenticated_owner: context.authenticated_owner,
1022            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1023            grant,
1024            kind: MessageKind::Bouncing,
1025            message,
1026        });
1027        Ok(())
1028    }
1029
1030    pub fn send_refund(
1031        &mut self,
1032        context: MessageContext,
1033        amount: Amount,
1034    ) -> Result<(), ExecutionError> {
1035        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1036        if amount.is_zero() {
1037            return Ok(());
1038        }
1039        let Some(account) = context.refund_grant_to else {
1040            return Err(ExecutionError::InternalError(
1041                "Messages with grants should have a non-empty `refund_grant_to`",
1042            ));
1043        };
1044        let message = SystemMessage::Credit {
1045            amount,
1046            source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
1047            target: account.owner,
1048        };
1049        self.txn_tracker.add_outgoing_message(
1050            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1051        );
1052        Ok(())
1053    }
1054
1055    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
1056    ///
1057    /// Ensures that the response does not exceed the provided `size_limit`.
1058    async fn receive_http_response(
1059        response: reqwest::Response,
1060        size_limit: u64,
1061    ) -> Result<http::Response, ExecutionError> {
1062        let status = response.status().as_u16();
1063        let maybe_content_length = response.content_length();
1064
1065        let headers = response
1066            .headers()
1067            .iter()
1068            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1069            .collect::<Vec<_>>();
1070
1071        let total_header_size = headers
1072            .iter()
1073            .map(|header| (header.name.len() + header.value.len()) as u64)
1074            .sum();
1075
1076        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1077            ExecutionError::HttpResponseSizeLimitExceeded {
1078                limit: size_limit,
1079                size: total_header_size,
1080            },
1081        )?;
1082
1083        if let Some(content_length) = maybe_content_length {
1084            if content_length > remaining_bytes {
1085                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1086                    limit: size_limit,
1087                    size: content_length + total_header_size,
1088                });
1089            }
1090        }
1091
1092        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1093        let mut body_stream = response.bytes_stream();
1094
1095        while let Some(bytes) = body_stream.next().await.transpose()? {
1096            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1097                ExecutionError::HttpResponseSizeLimitExceeded {
1098                    limit: size_limit,
1099                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1100                },
1101            )?;
1102
1103            body.extend(&bytes);
1104        }
1105
1106        Ok(http::Response {
1107            status,
1108            headers,
1109            body,
1110        })
1111    }
1112}
1113
1114/// Requests to the execution state.
1115#[derive(Debug)]
1116pub enum ExecutionRequest {
1117    #[cfg(not(web))]
1118    LoadContract {
1119        id: ApplicationId,
1120        #[debug(skip)]
1121        callback: Sender<(UserContractCode, ApplicationDescription)>,
1122    },
1123
1124    #[cfg(not(web))]
1125    LoadService {
1126        id: ApplicationId,
1127        #[debug(skip)]
1128        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1129    },
1130
1131    ChainBalance {
1132        #[debug(skip)]
1133        callback: Sender<Amount>,
1134    },
1135
1136    OwnerBalance {
1137        owner: AccountOwner,
1138        #[debug(skip)]
1139        callback: Sender<Amount>,
1140    },
1141
1142    OwnerBalances {
1143        #[debug(skip)]
1144        callback: Sender<Vec<(AccountOwner, Amount)>>,
1145    },
1146
1147    BalanceOwners {
1148        #[debug(skip)]
1149        callback: Sender<Vec<AccountOwner>>,
1150    },
1151
1152    Transfer {
1153        source: AccountOwner,
1154        destination: Account,
1155        amount: Amount,
1156        #[debug(skip_if = Option::is_none)]
1157        signer: Option<AccountOwner>,
1158        application_id: ApplicationId,
1159        #[debug(skip)]
1160        callback: Sender<()>,
1161    },
1162
1163    Claim {
1164        source: Account,
1165        destination: Account,
1166        amount: Amount,
1167        #[debug(skip_if = Option::is_none)]
1168        signer: Option<AccountOwner>,
1169        application_id: ApplicationId,
1170        #[debug(skip)]
1171        callback: Sender<()>,
1172    },
1173
1174    SystemTimestamp {
1175        #[debug(skip)]
1176        callback: Sender<Timestamp>,
1177    },
1178
1179    ChainOwnership {
1180        #[debug(skip)]
1181        callback: Sender<ChainOwnership>,
1182    },
1183
1184    ApplicationPermissions {
1185        #[debug(skip)]
1186        callback: Sender<ApplicationPermissions>,
1187    },
1188
1189    ReadApplicationDescription {
1190        application_id: ApplicationId,
1191        #[debug(skip)]
1192        callback: Sender<ApplicationDescription>,
1193    },
1194
1195    ReadValueBytes {
1196        id: ApplicationId,
1197        #[debug(with = hex_debug)]
1198        key: Vec<u8>,
1199        #[debug(skip)]
1200        callback: Sender<Option<Vec<u8>>>,
1201    },
1202
1203    ContainsKey {
1204        id: ApplicationId,
1205        key: Vec<u8>,
1206        #[debug(skip)]
1207        callback: Sender<bool>,
1208    },
1209
1210    ContainsKeys {
1211        id: ApplicationId,
1212        #[debug(with = hex_vec_debug)]
1213        keys: Vec<Vec<u8>>,
1214        callback: Sender<Vec<bool>>,
1215    },
1216
1217    ReadMultiValuesBytes {
1218        id: ApplicationId,
1219        #[debug(with = hex_vec_debug)]
1220        keys: Vec<Vec<u8>>,
1221        #[debug(skip)]
1222        callback: Sender<Vec<Option<Vec<u8>>>>,
1223    },
1224
1225    FindKeysByPrefix {
1226        id: ApplicationId,
1227        #[debug(with = hex_debug)]
1228        key_prefix: Vec<u8>,
1229        #[debug(skip)]
1230        callback: Sender<Vec<Vec<u8>>>,
1231    },
1232
1233    FindKeyValuesByPrefix {
1234        id: ApplicationId,
1235        #[debug(with = hex_debug)]
1236        key_prefix: Vec<u8>,
1237        #[debug(skip)]
1238        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1239    },
1240
1241    WriteBatch {
1242        id: ApplicationId,
1243        batch: Batch,
1244        #[debug(skip)]
1245        callback: Sender<()>,
1246    },
1247
1248    OpenChain {
1249        ownership: ChainOwnership,
1250        #[debug(skip_if = Amount::is_zero)]
1251        balance: Amount,
1252        parent_id: ChainId,
1253        block_height: BlockHeight,
1254        application_permissions: ApplicationPermissions,
1255        timestamp: Timestamp,
1256        #[debug(skip)]
1257        callback: Sender<ChainId>,
1258    },
1259
1260    CloseChain {
1261        application_id: ApplicationId,
1262        #[debug(skip)]
1263        callback: Sender<Result<(), ExecutionError>>,
1264    },
1265
1266    ChangeOwnership {
1267        application_id: ApplicationId,
1268        ownership: ChainOwnership,
1269        #[debug(skip)]
1270        callback: Sender<Result<(), ExecutionError>>,
1271    },
1272
1273    ChangeApplicationPermissions {
1274        application_id: ApplicationId,
1275        application_permissions: ApplicationPermissions,
1276        #[debug(skip)]
1277        callback: Sender<Result<(), ExecutionError>>,
1278    },
1279
1280    PeekApplicationIndex {
1281        #[debug(skip)]
1282        callback: Sender<u32>,
1283    },
1284
1285    CreateApplication {
1286        chain_id: ChainId,
1287        block_height: BlockHeight,
1288        module_id: ModuleId,
1289        parameters: Vec<u8>,
1290        required_application_ids: Vec<ApplicationId>,
1291        #[debug(skip)]
1292        callback: Sender<CreateApplicationResult>,
1293    },
1294
1295    PerformHttpRequest {
1296        request: http::Request,
1297        http_responses_are_oracle_responses: bool,
1298        #[debug(skip)]
1299        callback: Sender<http::Response>,
1300    },
1301
1302    ReadBlobContent {
1303        blob_id: BlobId,
1304        #[debug(skip)]
1305        callback: Sender<BlobContent>,
1306    },
1307
1308    AssertBlobExists {
1309        blob_id: BlobId,
1310        #[debug(skip)]
1311        callback: Sender<()>,
1312    },
1313
1314    Emit {
1315        stream_id: StreamId,
1316        #[debug(with = hex_debug)]
1317        value: Vec<u8>,
1318        #[debug(skip)]
1319        callback: Sender<u32>,
1320    },
1321
1322    ReadEvent {
1323        event_id: EventId,
1324        callback: oneshot::Sender<Vec<u8>>,
1325    },
1326
1327    SubscribeToEvents {
1328        chain_id: ChainId,
1329        stream_id: StreamId,
1330        subscriber_app_id: ApplicationId,
1331        #[debug(skip)]
1332        callback: Sender<()>,
1333    },
1334
1335    UnsubscribeFromEvents {
1336        chain_id: ChainId,
1337        stream_id: StreamId,
1338        subscriber_app_id: ApplicationId,
1339        #[debug(skip)]
1340        callback: Sender<()>,
1341    },
1342
1343    GetApplicationPermissions {
1344        #[debug(skip)]
1345        callback: Sender<ApplicationPermissions>,
1346    },
1347
1348    QueryServiceOracle {
1349        deadline: Option<Instant>,
1350        application_id: ApplicationId,
1351        next_block_height: BlockHeight,
1352        query: Vec<u8>,
1353        #[debug(skip)]
1354        callback: Sender<Vec<u8>>,
1355    },
1356
1357    AddOutgoingMessage {
1358        message: crate::OutgoingMessage,
1359        #[debug(skip)]
1360        callback: Sender<()>,
1361    },
1362
1363    SetLocalTime {
1364        local_time: Timestamp,
1365        #[debug(skip)]
1366        callback: Sender<()>,
1367    },
1368
1369    AssertBefore {
1370        timestamp: Timestamp,
1371        #[debug(skip)]
1372        callback: Sender<Result<(), ExecutionError>>,
1373    },
1374
1375    AddCreatedBlob {
1376        blob: crate::Blob,
1377        #[debug(skip)]
1378        callback: Sender<()>,
1379    },
1380
1381    ValidationRound {
1382        round: Option<u32>,
1383        #[debug(skip)]
1384        callback: Sender<Option<u32>>,
1385    },
1386
1387    TotalStorageSize {
1388        application: ApplicationId,
1389        #[debug(skip)]
1390        callback: Sender<(u32, u32)>,
1391    },
1392
1393    AllowApplicationLogs {
1394        #[debug(skip)]
1395        callback: Sender<bool>,
1396    },
1397
1398    /// Log message from contract execution (fire-and-forget, no callback needed).
1399    #[cfg(web)]
1400    Log {
1401        message: String,
1402        level: tracing::log::Level,
1403    },
1404}