linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::collections::{BTreeMap, BTreeSet};
7
8use custom_debug_derive::Debug;
9use futures::{channel::mpsc, StreamExt as _};
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
15        Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{
19        Account, AccountOwner, BlobId, BlobType, ChainId, EventId, OwnerSpender, StreamId,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23};
24use linera_views::{batch::Batch, context::Context, views::View};
25use oneshot::Sender;
26use reqwest::{header::HeaderMap, Client, Url};
27use tracing::{info_span, instrument, Instrument as _};
28
29use crate::{
30    execution::UserAction,
31    runtime::ContractSyncRuntime,
32    system::{CreateApplicationResult, OpenChainConfig},
33    util::{OracleResponseExt as _, RespondExt as _},
34    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
35    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
36    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
37    ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
38};
39
40/// Actor for handling requests to the execution state.
41pub struct ExecutionStateActor<'a, C> {
42    state: &'a mut ExecutionStateView<C>,
43    txn_tracker: &'a mut TransactionTracker,
44    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
45}
46
47#[cfg(with_metrics)]
48mod metrics {
49    use std::sync::LazyLock;
50
51    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
52    use prometheus::HistogramVec;
53
54    /// Histogram of the latency to load a contract bytecode.
55    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
56        register_histogram_vec(
57            "load_contract_latency",
58            "Load contract latency",
59            &[],
60            exponential_bucket_latencies(250.0),
61        )
62    });
63
64    /// Histogram of the latency to load a service bytecode.
65    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
66        register_histogram_vec(
67            "load_service_latency",
68            "Load service latency",
69            &[],
70            exponential_bucket_latencies(250.0),
71        )
72    });
73}
74
75pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
76
77impl<'a, C> ExecutionStateActor<'a, C>
78where
79    C: Context + Clone + 'static,
80    C::Extra: ExecutionRuntimeContext,
81{
82    /// Creates a new execution state actor.
83    pub fn new(
84        state: &'a mut ExecutionStateView<C>,
85        txn_tracker: &'a mut TransactionTracker,
86        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
87    ) -> Self {
88        Self {
89            state,
90            txn_tracker,
91            resource_controller,
92        }
93    }
94
95    #[instrument(skip_all, fields(application_id = %id))]
96    pub(crate) async fn load_contract(
97        &mut self,
98        id: ApplicationId,
99    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
100        #[cfg(with_metrics)]
101        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
102        let blob_id = id.description_blob_id();
103        let description = match self.txn_tracker.get_blob_content(&blob_id) {
104            Some(blob) => bcs::from_bytes(blob.bytes())?,
105            None => {
106                self.state
107                    .system
108                    .describe_application(id, self.txn_tracker)
109                    .await?
110            }
111        };
112        let code = self
113            .state
114            .context()
115            .extra()
116            .get_user_contract(&description, self.txn_tracker)
117            .await?;
118        Ok((code, description))
119    }
120
121    pub(crate) async fn load_service(
122        &mut self,
123        id: ApplicationId,
124    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
125        #[cfg(with_metrics)]
126        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
127        let blob_id = id.description_blob_id();
128        let description = match self.txn_tracker.get_blob_content(&blob_id) {
129            Some(blob) => bcs::from_bytes(blob.bytes())?,
130            None => {
131                self.state
132                    .system
133                    .describe_application(id, self.txn_tracker)
134                    .await?
135            }
136        };
137        let code = self
138            .state
139            .context()
140            .extra()
141            .get_user_service(&description, self.txn_tracker)
142            .await?;
143        Ok((code, description))
144    }
145
146    // TODO(#1416): Support concurrent I/O.
147    #[instrument(
148        skip_all,
149        fields(request_type = %request.as_ref())
150    )]
151    pub(crate) async fn handle_request(
152        &mut self,
153        request: ExecutionRequest,
154    ) -> Result<(), ExecutionError> {
155        use ExecutionRequest::*;
156        match request {
157            #[cfg(not(web))]
158            LoadContract { id, callback } => {
159                let (code, description) = self.load_contract(id).await?;
160                callback.respond((code, description))
161            }
162            #[cfg(not(web))]
163            LoadService { id, callback } => {
164                let (code, description) = self.load_service(id).await?;
165                callback.respond((code, description))
166            }
167
168            ChainBalance { callback } => {
169                let balance = *self.state.system.balance.get();
170                callback.respond(balance);
171            }
172
173            OwnerBalance { owner, callback } => {
174                let balance = self
175                    .state
176                    .system
177                    .balances
178                    .get(&owner)
179                    .await?
180                    .unwrap_or_default();
181                callback.respond(balance);
182            }
183
184            OwnerBalances { callback } => {
185                callback.respond(self.state.system.balances.index_values().await?);
186            }
187
188            BalanceOwners { callback } => {
189                let owners = self.state.system.balances.indices().await?;
190                callback.respond(owners);
191            }
192
193            Allowance {
194                owner,
195                spender,
196                callback,
197            } => {
198                let owner_spender = OwnerSpender::new(owner, spender);
199                let allowance = self
200                    .state
201                    .system
202                    .allowances
203                    .get(&owner_spender)
204                    .await?
205                    .unwrap_or_default();
206                callback.respond(allowance);
207            }
208
209            Allowances { callback } => {
210                let entries: Vec<_> = self
211                    .state
212                    .system
213                    .allowances
214                    .index_values()
215                    .await?
216                    .into_iter()
217                    .map(|(os, amount)| (os.owner, os.spender, amount))
218                    .collect();
219                callback.respond(entries);
220            }
221
222            Transfer {
223                source,
224                destination,
225                amount,
226                signer,
227                application_id,
228                callback,
229            } => {
230                let maybe_message = self
231                    .state
232                    .system
233                    .transfer(signer, Some(application_id), source, destination, amount)
234                    .await?;
235                self.txn_tracker.add_outgoing_messages(maybe_message);
236                callback.respond(());
237            }
238
239            Claim {
240                source,
241                destination,
242                amount,
243                signer,
244                application_id,
245                callback,
246            } => {
247                let maybe_message = self
248                    .state
249                    .system
250                    .claim(
251                        signer,
252                        Some(application_id),
253                        source.owner,
254                        source.chain_id,
255                        destination,
256                        amount,
257                    )
258                    .await?;
259                self.txn_tracker.add_outgoing_messages(maybe_message);
260                callback.respond(());
261            }
262
263            Approve {
264                owner,
265                spender,
266                amount,
267                signer,
268                application_id,
269                callback,
270            } => {
271                self.state
272                    .system
273                    .approve(signer, Some(application_id), owner, spender, amount)
274                    .await?;
275                callback.respond(());
276            }
277
278            TransferFrom {
279                owner,
280                spender,
281                destination,
282                amount,
283                signer,
284                application_id,
285                callback,
286            } => {
287                let maybe_message = self
288                    .state
289                    .system
290                    .transfer_from(
291                        signer,
292                        Some(application_id),
293                        owner,
294                        spender,
295                        destination,
296                        amount,
297                    )
298                    .await?;
299                self.txn_tracker.add_outgoing_messages(maybe_message);
300                callback.respond(());
301            }
302
303            SystemTimestamp { callback } => {
304                let timestamp = *self.state.system.timestamp.get();
305                callback.respond(timestamp);
306            }
307
308            ChainOwnership { callback } => {
309                let ownership = self.state.system.ownership.get().clone();
310                callback.respond(ownership);
311            }
312
313            ApplicationPermissions { callback } => {
314                let permissions = self.state.system.application_permissions.get().clone();
315                callback.respond(permissions);
316            }
317
318            ReadApplicationDescription {
319                application_id,
320                callback,
321            } => {
322                let blob_id = application_id.description_blob_id();
323                let description = match self.txn_tracker.get_blob_content(&blob_id) {
324                    Some(blob) => bcs::from_bytes(blob.bytes())?,
325                    None => {
326                        let blob_content = self.state.system.read_blob_content(blob_id).await?;
327                        self.state
328                            .system
329                            .blob_used(self.txn_tracker, blob_id)
330                            .await?;
331                        bcs::from_bytes(blob_content.bytes())?
332                    }
333                };
334                callback.respond(description);
335            }
336
337            ContainsKey { id, key, callback } => {
338                let view = self.state.users.try_load_entry(&id).await?;
339                let result = match view {
340                    Some(view) => view.contains_key(&key).await?,
341                    None => false,
342                };
343                callback.respond(result);
344            }
345
346            ContainsKeys { id, keys, callback } => {
347                let view = self.state.users.try_load_entry(&id).await?;
348                let result = match view {
349                    Some(view) => view.contains_keys(&keys).await?,
350                    None => vec![false; keys.len()],
351                };
352                callback.respond(result);
353            }
354
355            ReadMultiValuesBytes { id, keys, callback } => {
356                let view = self.state.users.try_load_entry(&id).await?;
357                let values = match view {
358                    Some(view) => view.multi_get(&keys).await?,
359                    None => vec![None; keys.len()],
360                };
361                callback.respond(values);
362            }
363
364            ReadValueBytes { id, key, callback } => {
365                let view = self.state.users.try_load_entry(&id).await?;
366                let result = match view {
367                    Some(view) => view.get(&key).await?,
368                    None => None,
369                };
370                callback.respond(result);
371            }
372
373            FindKeysByPrefix {
374                id,
375                key_prefix,
376                callback,
377            } => {
378                let view = self.state.users.try_load_entry(&id).await?;
379                let result = match view {
380                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
381                    None => Vec::new(),
382                };
383                callback.respond(result);
384            }
385
386            FindKeyValuesByPrefix {
387                id,
388                key_prefix,
389                callback,
390            } => {
391                let view = self.state.users.try_load_entry(&id).await?;
392                let result = match view {
393                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
394                    None => Vec::new(),
395                };
396                callback.respond(result);
397            }
398
399            WriteBatch {
400                id,
401                batch,
402                callback,
403            } => {
404                let mut view = self.state.users.try_load_entry_mut(&id).await?;
405                view.write_batch(batch).await?;
406                callback.respond(());
407            }
408
409            OpenChain {
410                ownership,
411                balance,
412                parent_id,
413                block_height,
414                application_permissions,
415                timestamp,
416                callback,
417            } => {
418                let config = OpenChainConfig {
419                    ownership,
420                    balance,
421                    application_permissions,
422                };
423                let chain_id = self
424                    .state
425                    .system
426                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
427                    .await?;
428                callback.respond(chain_id);
429            }
430
431            CloseChain {
432                application_id,
433                callback,
434            } => {
435                let app_permissions = self.state.system.application_permissions.get();
436                if !app_permissions.can_manage_chain(&application_id) {
437                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
438                } else {
439                    self.state.system.close_chain();
440                    callback.respond(Ok(()));
441                }
442            }
443
444            ChangeOwnership {
445                application_id,
446                ownership,
447                callback,
448            } => {
449                let app_permissions = self.state.system.application_permissions.get();
450                if !app_permissions.can_manage_chain(&application_id) {
451                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
452                } else {
453                    self.state.system.ownership.set(ownership);
454                    callback.respond(Ok(()));
455                }
456            }
457
458            ChangeApplicationPermissions {
459                application_id,
460                application_permissions,
461                callback,
462            } => {
463                let app_permissions = self.state.system.application_permissions.get();
464                if !app_permissions.can_manage_chain(&application_id) {
465                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
466                } else {
467                    self.state
468                        .system
469                        .application_permissions
470                        .set(application_permissions);
471                    callback.respond(Ok(()));
472                }
473            }
474
475            PeekApplicationIndex { callback } => {
476                let index = self.txn_tracker.peek_application_index();
477                callback.respond(index)
478            }
479
480            CreateApplication {
481                chain_id,
482                block_height,
483                module_id,
484                parameters,
485                required_application_ids,
486                callback,
487            } => {
488                let create_application_result = self
489                    .state
490                    .system
491                    .create_application(
492                        chain_id,
493                        block_height,
494                        module_id,
495                        parameters,
496                        required_application_ids,
497                        self.txn_tracker,
498                    )
499                    .await?;
500                callback.respond(create_application_result);
501            }
502
503            PerformHttpRequest {
504                request,
505                http_responses_are_oracle_responses,
506                callback,
507            } => {
508                let system = &mut self.state.system;
509                let response = self
510                    .txn_tracker
511                    .oracle(|| async {
512                        let headers = request
513                            .headers
514                            .into_iter()
515                            .map(|http::Header { name, value }| {
516                                Ok((name.parse()?, value.try_into()?))
517                            })
518                            .collect::<Result<HeaderMap, ExecutionError>>()?;
519
520                        let url = Url::parse(&request.url)?;
521                        let host = url
522                            .host_str()
523                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
524
525                        let (_epoch, committee) = system
526                            .current_committee()
527                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
528                        let allowed_hosts = &committee.policy().http_request_allow_list;
529
530                        ensure!(
531                            allowed_hosts.contains(host),
532                            ExecutionError::UnauthorizedHttpRequest(url)
533                        );
534
535                        let request = Client::new()
536                            .request(request.method.into(), url)
537                            .body(request.body)
538                            .headers(headers);
539                        #[cfg(not(web))]
540                        let request = request.timeout(linera_base::time::Duration::from_millis(
541                            committee.policy().http_request_timeout_ms,
542                        ));
543
544                        let response = request.send().await?;
545
546                        let mut response_size_limit =
547                            committee.policy().maximum_http_response_bytes;
548
549                        if http_responses_are_oracle_responses {
550                            response_size_limit = response_size_limit
551                                .min(committee.policy().maximum_oracle_response_bytes);
552                        }
553                        Ok(OracleResponse::Http(
554                            Self::receive_http_response(response, response_size_limit).await?,
555                        ))
556                    })
557                    .await?
558                    .to_http_response()?;
559                callback.respond(response);
560            }
561
562            ReadBlobContent { blob_id, callback } => {
563                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
564                    content.clone()
565                } else {
566                    let content = self.state.system.read_blob_content(blob_id).await?;
567                    if blob_id.blob_type == BlobType::Data {
568                        self.resource_controller
569                            .with_state(&mut self.state.system)
570                            .await?
571                            .track_blob_read(content.bytes().len() as u64)?;
572                    }
573                    self.state
574                        .system
575                        .blob_used(self.txn_tracker, blob_id)
576                        .await?;
577                    content
578                };
579                callback.respond(content)
580            }
581
582            AssertBlobExists { blob_id, callback } => {
583                self.state.system.assert_blob_exists(blob_id).await?;
584                // Treating this as reading a size-0 blob for fee purposes.
585                if blob_id.blob_type == BlobType::Data {
586                    self.resource_controller
587                        .with_state(&mut self.state.system)
588                        .await?
589                        .track_blob_read(0)?;
590                }
591                let is_new = self
592                    .state
593                    .system
594                    .blob_used(self.txn_tracker, blob_id)
595                    .await?;
596                if is_new {
597                    self.txn_tracker
598                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
599                }
600                callback.respond(());
601            }
602
603            Emit {
604                stream_id,
605                value,
606                callback,
607            } => {
608                let count = self
609                    .state
610                    .system
611                    .stream_event_counts
612                    .get_mut_or_default(&stream_id)
613                    .await?;
614                let index = *count;
615                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
616                self.resource_controller
617                    .with_state(&mut self.state.system)
618                    .await?
619                    .track_event_published(&value)?;
620                self.txn_tracker.add_event(stream_id, index, value);
621                callback.respond(index)
622            }
623
624            ReadEvent { event_id, callback } => {
625                let context = self.state.context();
626                let extra = context.extra();
627                let event = self
628                    .txn_tracker
629                    .oracle(|| async {
630                        let event = extra
631                            .get_event(event_id.clone())
632                            .await?
633                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
634                        Ok(OracleResponse::Event(event_id.clone(), event))
635                    })
636                    .await?
637                    .to_event(&event_id)?;
638                self.resource_controller
639                    .with_state(&mut self.state.system)
640                    .await?
641                    .track_event_read(event.len() as u64)?;
642                callback.respond(event);
643            }
644
645            SubscribeToEvents {
646                chain_id,
647                stream_id,
648                subscriber_app_id,
649                callback,
650            } => {
651                let subscriptions = self
652                    .state
653                    .system
654                    .event_subscriptions
655                    .get_mut_or_default(&(chain_id, stream_id.clone()))
656                    .await?;
657                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
658                    subscriptions.next_index
659                } else {
660                    0
661                };
662                self.txn_tracker.add_stream_to_process(
663                    subscriber_app_id,
664                    chain_id,
665                    stream_id,
666                    0,
667                    next_index,
668                );
669                callback.respond(());
670            }
671
672            UnsubscribeFromEvents {
673                chain_id,
674                stream_id,
675                subscriber_app_id,
676                callback,
677            } => {
678                let key = (chain_id, stream_id.clone());
679                let subscriptions = self
680                    .state
681                    .system
682                    .event_subscriptions
683                    .get_mut_or_default(&key)
684                    .await?;
685                subscriptions.applications.remove(&subscriber_app_id);
686                if subscriptions.applications.is_empty() {
687                    self.state.system.event_subscriptions.remove(&key)?;
688                }
689                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
690                    self.txn_tracker
691                        .remove_stream_to_process(app_id, chain_id, stream_id);
692                }
693                callback.respond(());
694            }
695
696            GetApplicationPermissions { callback } => {
697                let app_permissions = self.state.system.application_permissions.get();
698                callback.respond(app_permissions.clone());
699            }
700
701            QueryServiceOracle {
702                deadline,
703                application_id,
704                next_block_height,
705                query,
706                callback,
707            } => {
708                let state = &mut self.state;
709                let local_time = self.txn_tracker.local_time();
710                let created_blobs = self.txn_tracker.created_blobs().clone();
711                let bytes = self
712                    .txn_tracker
713                    .oracle(|| async {
714                        let context = QueryContext {
715                            chain_id: state.context().extra().chain_id(),
716                            next_block_height,
717                            local_time,
718                        };
719                        let QueryOutcome {
720                            response,
721                            operations,
722                        } = Box::pin(state.query_user_application_with_deadline(
723                            application_id,
724                            context,
725                            query,
726                            deadline,
727                            created_blobs,
728                        ))
729                        .await?;
730                        ensure!(
731                            operations.is_empty(),
732                            ExecutionError::ServiceOracleQueryOperations(operations)
733                        );
734                        Ok(OracleResponse::Service(response))
735                    })
736                    .await?
737                    .to_service_response()?;
738                callback.respond(bytes);
739            }
740
741            AddOutgoingMessage { message, callback } => {
742                self.txn_tracker.add_outgoing_message(message);
743                callback.respond(());
744            }
745
746            SetLocalTime {
747                local_time,
748                callback,
749            } => {
750                self.txn_tracker.set_local_time(local_time);
751                callback.respond(());
752            }
753
754            AssertBefore {
755                timestamp,
756                callback,
757            } => {
758                let result = if !self
759                    .txn_tracker
760                    .replay_oracle_response(OracleResponse::Assert)?
761                {
762                    // There are no recorded oracle responses, so we check the local time.
763                    let local_time = self.txn_tracker.local_time();
764                    if local_time >= timestamp {
765                        Err(ExecutionError::AssertBefore {
766                            timestamp,
767                            local_time,
768                        })
769                    } else {
770                        Ok(())
771                    }
772                } else {
773                    Ok(())
774                };
775                callback.respond(result);
776            }
777
778            AddCreatedBlob { blob, callback } => {
779                if self.resource_controller.is_free {
780                    self.txn_tracker.mark_blob_free(blob.id());
781                }
782                self.txn_tracker.add_created_blob(blob);
783                callback.respond(());
784            }
785
786            ValidationRound { round, callback } => {
787                let validation_round = self
788                    .txn_tracker
789                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
790                    .await?
791                    .to_round()?;
792                callback.respond(validation_round);
793            }
794
795            TotalStorageSize {
796                application,
797                callback,
798            } => {
799                let view = self.state.users.try_load_entry(&application).await?;
800                let result = match view {
801                    Some(view) => {
802                        let total_size = view.total_size();
803                        (total_size.key, total_size.value)
804                    }
805                    None => (0, 0),
806                };
807                callback.respond(result);
808            }
809
810            AllowApplicationLogs { callback } => {
811                let allow = self
812                    .state
813                    .context()
814                    .extra()
815                    .execution_runtime_config()
816                    .allow_application_logs;
817                callback.respond(allow);
818            }
819
820            #[cfg(web)]
821            Log { message, level } => match level {
822                tracing::log::Level::Trace | tracing::log::Level::Debug => {
823                    tracing::debug!(target: "user_application_log", message = %message);
824                }
825                tracing::log::Level::Info => {
826                    tracing::info!(target: "user_application_log", message = %message);
827                }
828                tracing::log::Level::Warn => {
829                    tracing::warn!(target: "user_application_log", message = %message);
830                }
831                tracing::log::Level::Error => {
832                    tracing::error!(target: "user_application_log", message = %message);
833                }
834            },
835        }
836
837        Ok(())
838    }
839
840    /// Calls `process_streams` for all applications that are subscribed to streams with new
841    /// events or that have new subscriptions.
842    #[instrument(skip_all)]
843    async fn process_subscriptions(
844        &mut self,
845        context: ProcessStreamsContext,
846    ) -> Result<(), ExecutionError> {
847        // Keep track of which streams we have already processed. This is to guard against
848        // applications unsubscribing and subscribing in the process_streams call itself.
849        let mut processed = BTreeSet::new();
850        loop {
851            let to_process = self
852                .txn_tracker
853                .take_streams_to_process()
854                .into_iter()
855                .filter_map(|(app_id, updates)| {
856                    let updates = updates
857                        .into_iter()
858                        .filter_map(|update| {
859                            if !processed.insert((
860                                app_id,
861                                update.chain_id,
862                                update.stream_id.clone(),
863                            )) {
864                                return None;
865                            }
866                            Some(update)
867                        })
868                        .collect::<Vec<_>>();
869                    if updates.is_empty() {
870                        return None;
871                    }
872                    Some((app_id, updates))
873                })
874                .collect::<BTreeMap<_, _>>();
875            if to_process.is_empty() {
876                return Ok(());
877            }
878            for (app_id, updates) in to_process {
879                self.run_user_action(
880                    app_id,
881                    UserAction::ProcessStreams(context, updates),
882                    None,
883                    None,
884                )
885                .await?;
886            }
887        }
888    }
889
890    pub(crate) async fn run_user_action(
891        &mut self,
892        application_id: ApplicationId,
893        action: UserAction,
894        refund_grant_to: Option<Account>,
895        grant: Option<&mut Amount>,
896    ) -> Result<(), ExecutionError> {
897        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
898            .await
899    }
900
901    // TODO(#5034): unify with `contract_and_dependencies`
902    pub(crate) async fn service_and_dependencies(
903        &mut self,
904        application: ApplicationId,
905    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
906        // cyclic futures are illegal so we need to either box the frames or keep our own
907        // stack
908        let mut stack = vec![application];
909        let mut codes = vec![];
910        let mut descriptions = vec![];
911
912        while let Some(id) = stack.pop() {
913            let (code, description) = self.load_service(id).await?;
914            stack.extend(description.required_application_ids.iter().rev().copied());
915            codes.push(code);
916            descriptions.push(description);
917        }
918
919        codes.reverse();
920        descriptions.reverse();
921
922        Ok((codes, descriptions))
923    }
924
925    // TODO(#5034): unify with `service_and_dependencies`
926    #[instrument(skip_all, fields(application_id = %application))]
927    async fn contract_and_dependencies(
928        &mut self,
929        application: ApplicationId,
930    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
931        // cyclic futures are illegal so we need to either box the frames or keep our own
932        // stack
933        let mut stack = vec![application];
934        let mut codes = vec![];
935        let mut descriptions = vec![];
936
937        while let Some(id) = stack.pop() {
938            let (code, description) = self.load_contract(id).await?;
939            stack.extend(description.required_application_ids.iter().rev().copied());
940            codes.push(code);
941            descriptions.push(description);
942        }
943
944        codes.reverse();
945        descriptions.reverse();
946
947        Ok((codes, descriptions))
948    }
949
950    #[instrument(skip_all, fields(application_id = %application_id))]
951    async fn run_user_action_with_runtime(
952        &mut self,
953        application_id: ApplicationId,
954        action: UserAction,
955        refund_grant_to: Option<Account>,
956        grant: Option<&mut Amount>,
957    ) -> Result<(), ExecutionError> {
958        let chain_id = self.state.context().extra().chain_id();
959        let mut cloned_grant = grant.as_ref().map(|x| **x);
960        let initial_balance = self
961            .resource_controller
962            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
963            .await?
964            .balance()?;
965        let mut controller = ResourceController::new(
966            self.resource_controller.policy().clone(),
967            self.resource_controller.tracker,
968            initial_balance,
969        );
970        let is_free = matches!(
971            &action,
972            UserAction::Message(..) | UserAction::ProcessStreams(..)
973        ) && self
974            .resource_controller
975            .policy()
976            .is_free_app(&application_id);
977        controller.is_free = is_free;
978        self.resource_controller.is_free = is_free;
979        let (execution_state_sender, mut execution_state_receiver) =
980            futures::channel::mpsc::unbounded();
981
982        let (codes, descriptions): (Vec<_>, Vec<_>) =
983            self.contract_and_dependencies(application_id).await?;
984
985        let allow_application_logs = self
986            .state
987            .context()
988            .extra()
989            .execution_runtime_config()
990            .allow_application_logs;
991
992        let contract_runtime_task = self
993            .state
994            .context()
995            .extra()
996            .thread_pool()
997            .run_send(JsVec(codes), move |codes| async move {
998                let runtime = ContractSyncRuntime::new(
999                    execution_state_sender,
1000                    chain_id,
1001                    refund_grant_to,
1002                    controller,
1003                    &action,
1004                    allow_application_logs,
1005                );
1006
1007                for (code, description) in codes.0.into_iter().zip(descriptions) {
1008                    runtime.preload_contract(ApplicationId::from(&description), code, description);
1009                }
1010
1011                runtime.run_action(application_id, chain_id, action)
1012            })
1013            .await;
1014
1015        async {
1016            while let Some(request) = execution_state_receiver.next().await {
1017                self.handle_request(request).await?;
1018            }
1019            Ok::<(), ExecutionError>(())
1020        }
1021        .instrument(info_span!("handle_runtime_requests"))
1022        .await?;
1023
1024        let (result, controller) = contract_runtime_task.await??;
1025
1026        self.resource_controller.is_free = false;
1027
1028        self.txn_tracker.add_operation_result(result);
1029
1030        self.resource_controller
1031            .with_state_and_grant(&mut self.state.system, grant)
1032            .await?
1033            .merge_balance(initial_balance, controller.balance()?)?;
1034        self.resource_controller.tracker = controller.tracker;
1035
1036        Ok(())
1037    }
1038
1039    #[instrument(skip_all, fields(
1040        chain_id = %context.chain_id,
1041        block_height = %context.height,
1042        operation_type = %operation.as_ref(),
1043    ))]
1044    pub async fn execute_operation(
1045        &mut self,
1046        context: OperationContext,
1047        operation: Operation,
1048    ) -> Result<(), ExecutionError> {
1049        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1050        match operation {
1051            Operation::System(op) => {
1052                let new_application = self
1053                    .state
1054                    .system
1055                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
1056                    .await?;
1057                if let Some((application_id, argument)) = new_application {
1058                    let user_action = UserAction::Instantiate(context, argument);
1059                    self.run_user_action(
1060                        application_id,
1061                        user_action,
1062                        context.refund_grant_to(),
1063                        None,
1064                    )
1065                    .await?;
1066                }
1067            }
1068            Operation::User {
1069                application_id,
1070                bytes,
1071            } => {
1072                self.run_user_action(
1073                    application_id,
1074                    UserAction::Operation(context, bytes),
1075                    context.refund_grant_to(),
1076                    None,
1077                )
1078                .await?;
1079            }
1080        }
1081        self.process_subscriptions(context.into()).await?;
1082        Ok(())
1083    }
1084
1085    #[instrument(skip_all, fields(
1086        chain_id = %context.chain_id,
1087        block_height = %context.height,
1088        origin = %context.origin,
1089        is_bouncing = %context.is_bouncing,
1090        message_type = %message.as_ref(),
1091    ))]
1092    pub async fn execute_message(
1093        &mut self,
1094        context: MessageContext,
1095        message: Message,
1096        grant: Option<&mut Amount>,
1097    ) -> Result<(), ExecutionError> {
1098        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1099        match message {
1100            Message::System(message) => {
1101                let outcome = self.state.system.execute_message(context, message).await?;
1102                self.txn_tracker.add_outgoing_messages(outcome);
1103            }
1104            Message::User {
1105                application_id,
1106                bytes,
1107            } => {
1108                self.run_user_action(
1109                    application_id,
1110                    UserAction::Message(context, bytes),
1111                    context.refund_grant_to,
1112                    grant,
1113                )
1114                .await?;
1115            }
1116        }
1117        self.process_subscriptions(context.into()).await?;
1118        Ok(())
1119    }
1120
1121    pub fn bounce_message(
1122        &mut self,
1123        context: MessageContext,
1124        grant: Amount,
1125        message: Message,
1126    ) -> Result<(), ExecutionError> {
1127        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1128        self.txn_tracker.add_outgoing_message(OutgoingMessage {
1129            destination: context.origin,
1130            authenticated_owner: context.authenticated_owner,
1131            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1132            grant,
1133            kind: MessageKind::Bouncing,
1134            message,
1135        });
1136        Ok(())
1137    }
1138
1139    pub fn send_refund(
1140        &mut self,
1141        context: MessageContext,
1142        amount: Amount,
1143    ) -> Result<(), ExecutionError> {
1144        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1145        if amount.is_zero() {
1146            return Ok(());
1147        }
1148        let Some(account) = context.refund_grant_to else {
1149            return Err(ExecutionError::InternalError(
1150                "Messages with grants should have a non-empty `refund_grant_to`",
1151            ));
1152        };
1153        let message = SystemMessage::Credit {
1154            amount,
1155            source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
1156            target: account.owner,
1157        };
1158        self.txn_tracker.add_outgoing_message(
1159            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1160        );
1161        Ok(())
1162    }
1163
1164    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
1165    ///
1166    /// Ensures that the response does not exceed the provided `size_limit`.
1167    async fn receive_http_response(
1168        response: reqwest::Response,
1169        size_limit: u64,
1170    ) -> Result<http::Response, ExecutionError> {
1171        let status = response.status().as_u16();
1172        let maybe_content_length = response.content_length();
1173
1174        let headers = response
1175            .headers()
1176            .iter()
1177            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1178            .collect::<Vec<_>>();
1179
1180        let total_header_size = headers
1181            .iter()
1182            .map(|header| (header.name.len() + header.value.len()) as u64)
1183            .sum();
1184
1185        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1186            ExecutionError::HttpResponseSizeLimitExceeded {
1187                limit: size_limit,
1188                size: total_header_size,
1189            },
1190        )?;
1191
1192        if let Some(content_length) = maybe_content_length {
1193            if content_length > remaining_bytes {
1194                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1195                    limit: size_limit,
1196                    size: content_length + total_header_size,
1197                });
1198            }
1199        }
1200
1201        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1202        let mut body_stream = response.bytes_stream();
1203
1204        while let Some(bytes) = body_stream.next().await.transpose()? {
1205            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1206                ExecutionError::HttpResponseSizeLimitExceeded {
1207                    limit: size_limit,
1208                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1209                },
1210            )?;
1211
1212            body.extend(&bytes);
1213        }
1214
1215        Ok(http::Response {
1216            status,
1217            headers,
1218            body,
1219        })
1220    }
1221}
1222
1223/// Requests to the execution state.
1224#[derive(Debug, strum::AsRefStr)]
1225pub enum ExecutionRequest {
1226    #[cfg(not(web))]
1227    LoadContract {
1228        id: ApplicationId,
1229        #[debug(skip)]
1230        callback: Sender<(UserContractCode, ApplicationDescription)>,
1231    },
1232
1233    #[cfg(not(web))]
1234    LoadService {
1235        id: ApplicationId,
1236        #[debug(skip)]
1237        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1238    },
1239
1240    ChainBalance {
1241        #[debug(skip)]
1242        callback: Sender<Amount>,
1243    },
1244
1245    OwnerBalance {
1246        owner: AccountOwner,
1247        #[debug(skip)]
1248        callback: Sender<Amount>,
1249    },
1250
1251    OwnerBalances {
1252        #[debug(skip)]
1253        callback: Sender<Vec<(AccountOwner, Amount)>>,
1254    },
1255
1256    BalanceOwners {
1257        #[debug(skip)]
1258        callback: Sender<Vec<AccountOwner>>,
1259    },
1260
1261    Allowance {
1262        owner: AccountOwner,
1263        spender: AccountOwner,
1264        #[debug(skip)]
1265        callback: Sender<Amount>,
1266    },
1267
1268    Allowances {
1269        #[debug(skip)]
1270        callback: Sender<Vec<(AccountOwner, AccountOwner, Amount)>>,
1271    },
1272
1273    Transfer {
1274        source: AccountOwner,
1275        destination: Account,
1276        amount: Amount,
1277        #[debug(skip_if = Option::is_none)]
1278        signer: Option<AccountOwner>,
1279        application_id: ApplicationId,
1280        #[debug(skip)]
1281        callback: Sender<()>,
1282    },
1283
1284    Claim {
1285        source: Account,
1286        destination: Account,
1287        amount: Amount,
1288        #[debug(skip_if = Option::is_none)]
1289        signer: Option<AccountOwner>,
1290        application_id: ApplicationId,
1291        #[debug(skip)]
1292        callback: Sender<()>,
1293    },
1294
1295    Approve {
1296        owner: AccountOwner,
1297        spender: AccountOwner,
1298        amount: Amount,
1299        #[debug(skip_if = Option::is_none)]
1300        signer: Option<AccountOwner>,
1301        application_id: ApplicationId,
1302        #[debug(skip)]
1303        callback: Sender<()>,
1304    },
1305
1306    TransferFrom {
1307        owner: AccountOwner,
1308        spender: AccountOwner,
1309        destination: Account,
1310        amount: Amount,
1311        #[debug(skip_if = Option::is_none)]
1312        signer: Option<AccountOwner>,
1313        application_id: ApplicationId,
1314        #[debug(skip)]
1315        callback: Sender<()>,
1316    },
1317
1318    SystemTimestamp {
1319        #[debug(skip)]
1320        callback: Sender<Timestamp>,
1321    },
1322
1323    ChainOwnership {
1324        #[debug(skip)]
1325        callback: Sender<ChainOwnership>,
1326    },
1327
1328    ApplicationPermissions {
1329        #[debug(skip)]
1330        callback: Sender<ApplicationPermissions>,
1331    },
1332
1333    ReadApplicationDescription {
1334        application_id: ApplicationId,
1335        #[debug(skip)]
1336        callback: Sender<ApplicationDescription>,
1337    },
1338
1339    ReadValueBytes {
1340        id: ApplicationId,
1341        #[debug(with = hex_debug)]
1342        key: Vec<u8>,
1343        #[debug(skip)]
1344        callback: Sender<Option<Vec<u8>>>,
1345    },
1346
1347    ContainsKey {
1348        id: ApplicationId,
1349        key: Vec<u8>,
1350        #[debug(skip)]
1351        callback: Sender<bool>,
1352    },
1353
1354    ContainsKeys {
1355        id: ApplicationId,
1356        #[debug(with = hex_vec_debug)]
1357        keys: Vec<Vec<u8>>,
1358        callback: Sender<Vec<bool>>,
1359    },
1360
1361    ReadMultiValuesBytes {
1362        id: ApplicationId,
1363        #[debug(with = hex_vec_debug)]
1364        keys: Vec<Vec<u8>>,
1365        #[debug(skip)]
1366        callback: Sender<Vec<Option<Vec<u8>>>>,
1367    },
1368
1369    FindKeysByPrefix {
1370        id: ApplicationId,
1371        #[debug(with = hex_debug)]
1372        key_prefix: Vec<u8>,
1373        #[debug(skip)]
1374        callback: Sender<Vec<Vec<u8>>>,
1375    },
1376
1377    FindKeyValuesByPrefix {
1378        id: ApplicationId,
1379        #[debug(with = hex_debug)]
1380        key_prefix: Vec<u8>,
1381        #[debug(skip)]
1382        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1383    },
1384
1385    WriteBatch {
1386        id: ApplicationId,
1387        batch: Batch,
1388        #[debug(skip)]
1389        callback: Sender<()>,
1390    },
1391
1392    OpenChain {
1393        ownership: ChainOwnership,
1394        #[debug(skip_if = Amount::is_zero)]
1395        balance: Amount,
1396        parent_id: ChainId,
1397        block_height: BlockHeight,
1398        application_permissions: ApplicationPermissions,
1399        timestamp: Timestamp,
1400        #[debug(skip)]
1401        callback: Sender<ChainId>,
1402    },
1403
1404    CloseChain {
1405        application_id: ApplicationId,
1406        #[debug(skip)]
1407        callback: Sender<Result<(), ExecutionError>>,
1408    },
1409
1410    ChangeOwnership {
1411        application_id: ApplicationId,
1412        ownership: ChainOwnership,
1413        #[debug(skip)]
1414        callback: Sender<Result<(), ExecutionError>>,
1415    },
1416
1417    ChangeApplicationPermissions {
1418        application_id: ApplicationId,
1419        application_permissions: ApplicationPermissions,
1420        #[debug(skip)]
1421        callback: Sender<Result<(), ExecutionError>>,
1422    },
1423
1424    PeekApplicationIndex {
1425        #[debug(skip)]
1426        callback: Sender<u32>,
1427    },
1428
1429    CreateApplication {
1430        chain_id: ChainId,
1431        block_height: BlockHeight,
1432        module_id: ModuleId,
1433        parameters: Vec<u8>,
1434        required_application_ids: Vec<ApplicationId>,
1435        #[debug(skip)]
1436        callback: Sender<CreateApplicationResult>,
1437    },
1438
1439    PerformHttpRequest {
1440        request: http::Request,
1441        http_responses_are_oracle_responses: bool,
1442        #[debug(skip)]
1443        callback: Sender<http::Response>,
1444    },
1445
1446    ReadBlobContent {
1447        blob_id: BlobId,
1448        #[debug(skip)]
1449        callback: Sender<BlobContent>,
1450    },
1451
1452    AssertBlobExists {
1453        blob_id: BlobId,
1454        #[debug(skip)]
1455        callback: Sender<()>,
1456    },
1457
1458    Emit {
1459        stream_id: StreamId,
1460        #[debug(with = hex_debug)]
1461        value: Vec<u8>,
1462        #[debug(skip)]
1463        callback: Sender<u32>,
1464    },
1465
1466    ReadEvent {
1467        event_id: EventId,
1468        callback: oneshot::Sender<Vec<u8>>,
1469    },
1470
1471    SubscribeToEvents {
1472        chain_id: ChainId,
1473        stream_id: StreamId,
1474        subscriber_app_id: ApplicationId,
1475        #[debug(skip)]
1476        callback: Sender<()>,
1477    },
1478
1479    UnsubscribeFromEvents {
1480        chain_id: ChainId,
1481        stream_id: StreamId,
1482        subscriber_app_id: ApplicationId,
1483        #[debug(skip)]
1484        callback: Sender<()>,
1485    },
1486
1487    GetApplicationPermissions {
1488        #[debug(skip)]
1489        callback: Sender<ApplicationPermissions>,
1490    },
1491
1492    QueryServiceOracle {
1493        deadline: Option<Instant>,
1494        application_id: ApplicationId,
1495        next_block_height: BlockHeight,
1496        query: Vec<u8>,
1497        #[debug(skip)]
1498        callback: Sender<Vec<u8>>,
1499    },
1500
1501    AddOutgoingMessage {
1502        message: crate::OutgoingMessage,
1503        #[debug(skip)]
1504        callback: Sender<()>,
1505    },
1506
1507    SetLocalTime {
1508        local_time: Timestamp,
1509        #[debug(skip)]
1510        callback: Sender<()>,
1511    },
1512
1513    AssertBefore {
1514        timestamp: Timestamp,
1515        #[debug(skip)]
1516        callback: Sender<Result<(), ExecutionError>>,
1517    },
1518
1519    AddCreatedBlob {
1520        blob: crate::Blob,
1521        #[debug(skip)]
1522        callback: Sender<()>,
1523    },
1524
1525    ValidationRound {
1526        round: Option<u32>,
1527        #[debug(skip)]
1528        callback: Sender<Option<u32>>,
1529    },
1530
1531    TotalStorageSize {
1532        application: ApplicationId,
1533        #[debug(skip)]
1534        callback: Sender<(u32, u32)>,
1535    },
1536
1537    AllowApplicationLogs {
1538        #[debug(skip)]
1539        callback: Sender<bool>,
1540    },
1541
1542    /// Log message from contract execution (fire-and-forget, no callback needed).
1543    #[cfg(web)]
1544    Log {
1545        message: String,
1546        level: tracing::log::Level,
1547    },
1548}