linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::{
7    collections::{BTreeMap, BTreeSet},
8    sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16    data_types::{
17        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18        Timestamp,
19    },
20    ensure, hex_debug, hex_vec_debug, http,
21    identifiers::{
22        Account, AccountOwner, BlobId, BlobType, ChainId, EventId, OwnerSpender, StreamId,
23    },
24    ownership::ChainOwnership,
25    time::Instant,
26};
27use linera_views::{batch::Batch, context::Context, views::View};
28use oneshot::Sender;
29use reqwest::{header::HeaderMap, Client, Url};
30use tracing::{info_span, instrument, Instrument as _};
31
32use crate::{
33    execution::UserAction,
34    runtime::ContractSyncRuntime,
35    system::{CreateApplicationResult, OpenChainConfig},
36    util::{OracleResponseExt as _, RespondExt as _},
37    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
38    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
39    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
40    ResourceController, SystemMessage, TransactionTracker, UserContractCode, UserServiceCode,
41};
42
43/// Actor for handling requests to the execution state.
44pub struct ExecutionStateActor<'a, C> {
45    state: &'a mut ExecutionStateView<C>,
46    txn_tracker: &'a mut TransactionTracker,
47    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
48}
49
50#[cfg(with_metrics)]
51mod metrics {
52    use std::sync::LazyLock;
53
54    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
55    use prometheus::HistogramVec;
56
57    /// Histogram of the latency to load a contract bytecode.
58    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
59        register_histogram_vec(
60            "load_contract_latency",
61            "Load contract latency",
62            &[],
63            exponential_bucket_latencies(250.0),
64        )
65    });
66
67    /// Histogram of the latency to load a service bytecode.
68    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
69        register_histogram_vec(
70            "load_service_latency",
71            "Load service latency",
72            &[],
73            exponential_bucket_latencies(250.0),
74        )
75    });
76}
77
78pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
79
80impl<'a, C> ExecutionStateActor<'a, C>
81where
82    C: Context + Clone + 'static,
83    C::Extra: ExecutionRuntimeContext,
84{
85    /// Creates a new execution state actor.
86    pub fn new(
87        state: &'a mut ExecutionStateView<C>,
88        txn_tracker: &'a mut TransactionTracker,
89        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
90    ) -> Self {
91        Self {
92            state,
93            txn_tracker,
94            resource_controller,
95        }
96    }
97
98    #[instrument(skip_all, fields(application_id = %id))]
99    pub(crate) async fn load_contract(
100        &mut self,
101        id: ApplicationId,
102    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
103        #[cfg(with_metrics)]
104        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
105        let blob_id = id.description_blob_id();
106        let description = match self.txn_tracker.get_blob_content(&blob_id) {
107            Some(blob) => bcs::from_bytes(blob.bytes())?,
108            None => {
109                self.state
110                    .system
111                    .describe_application(id, self.txn_tracker)
112                    .await?
113            }
114        };
115        let code = self
116            .state
117            .context()
118            .extra()
119            .get_user_contract(&description, self.txn_tracker)
120            .await?;
121        Ok((code, description))
122    }
123
124    pub(crate) async fn load_service(
125        &mut self,
126        id: ApplicationId,
127    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
128        #[cfg(with_metrics)]
129        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
130        let blob_id = id.description_blob_id();
131        let description = match self.txn_tracker.get_blob_content(&blob_id) {
132            Some(blob) => bcs::from_bytes(blob.bytes())?,
133            None => {
134                self.state
135                    .system
136                    .describe_application(id, self.txn_tracker)
137                    .await?
138            }
139        };
140        let code = self
141            .state
142            .context()
143            .extra()
144            .get_user_service(&description, self.txn_tracker)
145            .await?;
146        Ok((code, description))
147    }
148
149    // TODO(#1416): Support concurrent I/O.
150    #[instrument(
151        skip_all,
152        fields(request_type = %request.as_ref())
153    )]
154    pub(crate) async fn handle_request(
155        &mut self,
156        request: ExecutionRequest,
157    ) -> Result<(), ExecutionError> {
158        use ExecutionRequest::*;
159        match request {
160            #[cfg(not(web))]
161            LoadContract { id, callback } => {
162                let (code, description) = self.load_contract(id).await?;
163                callback.respond((code, description))
164            }
165            #[cfg(not(web))]
166            LoadService { id, callback } => {
167                let (code, description) = self.load_service(id).await?;
168                callback.respond((code, description))
169            }
170
171            ChainBalance { callback } => {
172                let balance = *self.state.system.balance.get();
173                callback.respond(balance);
174            }
175
176            OwnerBalance { owner, callback } => {
177                let balance = self
178                    .state
179                    .system
180                    .balances
181                    .get(&owner)
182                    .await?
183                    .unwrap_or_default();
184                callback.respond(balance);
185            }
186
187            OwnerBalances { callback } => {
188                callback.respond(self.state.system.balances.index_values().await?);
189            }
190
191            BalanceOwners { callback } => {
192                let owners = self.state.system.balances.indices().await?;
193                callback.respond(owners);
194            }
195
196            Allowance {
197                owner,
198                spender,
199                callback,
200            } => {
201                let owner_spender = OwnerSpender::new(owner, spender);
202                let allowance = self
203                    .state
204                    .system
205                    .allowances
206                    .get(&owner_spender)
207                    .await?
208                    .unwrap_or_default();
209                callback.respond(allowance);
210            }
211
212            Allowances { callback } => {
213                let entries: Vec<_> = self
214                    .state
215                    .system
216                    .allowances
217                    .index_values()
218                    .await?
219                    .into_iter()
220                    .map(|(os, amount)| (os.owner, os.spender, amount))
221                    .collect();
222                callback.respond(entries);
223            }
224
225            Transfer {
226                source,
227                destination,
228                amount,
229                signer,
230                application_id,
231                callback,
232            } => {
233                let maybe_message = self
234                    .state
235                    .system
236                    .transfer(signer, Some(application_id), source, destination, amount)
237                    .await?;
238                self.txn_tracker.add_outgoing_messages(maybe_message);
239                callback.respond(());
240            }
241
242            Claim {
243                source,
244                destination,
245                amount,
246                signer,
247                application_id,
248                callback,
249            } => {
250                let maybe_message = self
251                    .state
252                    .system
253                    .claim(
254                        signer,
255                        Some(application_id),
256                        source.owner,
257                        source.chain_id,
258                        destination,
259                        amount,
260                    )
261                    .await?;
262                self.txn_tracker.add_outgoing_messages(maybe_message);
263                callback.respond(());
264            }
265
266            Approve {
267                owner,
268                spender,
269                amount,
270                signer,
271                application_id,
272                callback,
273            } => {
274                self.state
275                    .system
276                    .approve(signer, Some(application_id), owner, spender, amount)
277                    .await?;
278                callback.respond(());
279            }
280
281            TransferFrom {
282                owner,
283                spender,
284                destination,
285                amount,
286                signer,
287                application_id,
288                callback,
289            } => {
290                let maybe_message = self
291                    .state
292                    .system
293                    .transfer_from(
294                        signer,
295                        Some(application_id),
296                        owner,
297                        spender,
298                        destination,
299                        amount,
300                    )
301                    .await?;
302                self.txn_tracker.add_outgoing_messages(maybe_message);
303                callback.respond(());
304            }
305
306            SystemTimestamp { callback } => {
307                let timestamp = *self.state.system.timestamp.get();
308                callback.respond(timestamp);
309            }
310
311            ChainOwnership { callback } => {
312                let ownership = self.state.system.ownership.get().await?.clone();
313                callback.respond(ownership);
314            }
315
316            ApplicationPermissions { callback } => {
317                let permissions = self
318                    .state
319                    .system
320                    .application_permissions
321                    .get()
322                    .await?
323                    .clone();
324                callback.respond(permissions);
325            }
326
327            ReadApplicationDescription {
328                application_id,
329                callback,
330            } => {
331                let blob_id = application_id.description_blob_id();
332                let description = match self.txn_tracker.get_blob_content(&blob_id) {
333                    Some(blob) => bcs::from_bytes(blob.bytes())?,
334                    None => {
335                        let blob_content = self.state.system.read_blob_content(blob_id).await?;
336                        self.state
337                            .system
338                            .blob_used(self.txn_tracker, blob_id)
339                            .await?;
340                        bcs::from_bytes(blob_content.bytes())?
341                    }
342                };
343                callback.respond(description);
344            }
345
346            ContainsKey { id, key, callback } => {
347                let view = self.state.users.try_load_entry(&id).await?;
348                let result = match view {
349                    Some(view) => view.contains_key(&key).await?,
350                    None => false,
351                };
352                callback.respond(result);
353            }
354
355            ContainsKeys { id, keys, callback } => {
356                let view = self.state.users.try_load_entry(&id).await?;
357                let result = match view {
358                    Some(view) => view.contains_keys(&keys).await?,
359                    None => vec![false; keys.len()],
360                };
361                callback.respond(result);
362            }
363
364            ReadMultiValuesBytes { id, keys, callback } => {
365                let view = self.state.users.try_load_entry(&id).await?;
366                let values = match view {
367                    Some(view) => view.multi_get(&keys).await?,
368                    None => vec![None; keys.len()],
369                };
370                callback.respond(values);
371            }
372
373            ReadValueBytes { id, key, callback } => {
374                let view = self.state.users.try_load_entry(&id).await?;
375                let result = match view {
376                    Some(view) => view.get(&key).await?,
377                    None => None,
378                };
379                callback.respond(result);
380            }
381
382            FindKeysByPrefix {
383                id,
384                key_prefix,
385                callback,
386            } => {
387                let view = self.state.users.try_load_entry(&id).await?;
388                let result = match view {
389                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
390                    None => Vec::new(),
391                };
392                callback.respond(result);
393            }
394
395            FindKeyValuesByPrefix {
396                id,
397                key_prefix,
398                callback,
399            } => {
400                let view = self.state.users.try_load_entry(&id).await?;
401                let result = match view {
402                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
403                    None => Vec::new(),
404                };
405                callback.respond(result);
406            }
407
408            WriteBatch {
409                id,
410                batch,
411                callback,
412            } => {
413                let mut view = self.state.users.try_load_entry_mut(&id).await?;
414                view.write_batch(batch).await?;
415                callback.respond(());
416            }
417
418            OpenChain {
419                ownership,
420                balance,
421                parent_id,
422                block_height,
423                application_permissions,
424                timestamp,
425                callback,
426            } => {
427                let config = OpenChainConfig {
428                    ownership,
429                    balance,
430                    application_permissions,
431                };
432                let chain_id = self
433                    .state
434                    .system
435                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
436                    .await?;
437                callback.respond(chain_id);
438            }
439
440            CloseChain {
441                application_id,
442                callback,
443            } => {
444                let app_permissions = self.state.system.application_permissions.get().await?;
445                if !app_permissions.can_manage_chain(&application_id) {
446                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
447                } else {
448                    self.state.system.close_chain();
449                    callback.respond(Ok(()));
450                }
451            }
452
453            ChangeOwnership {
454                application_id,
455                ownership,
456                callback,
457            } => {
458                let app_permissions = self.state.system.application_permissions.get().await?;
459                if !app_permissions.can_manage_chain(&application_id) {
460                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
461                } else {
462                    self.state.system.ownership.set(ownership);
463                    callback.respond(Ok(()));
464                }
465            }
466
467            ChangeApplicationPermissions {
468                application_id,
469                application_permissions,
470                callback,
471            } => {
472                let app_permissions = self.state.system.application_permissions.get().await?;
473                if !app_permissions.can_manage_chain(&application_id) {
474                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
475                } else {
476                    self.state
477                        .system
478                        .application_permissions
479                        .set(application_permissions);
480                    callback.respond(Ok(()));
481                }
482            }
483
484            PeekApplicationIndex { callback } => {
485                let index = self.txn_tracker.peek_application_index();
486                callback.respond(index)
487            }
488
489            CreateApplication {
490                chain_id,
491                block_height,
492                module_id,
493                parameters,
494                required_application_ids,
495                callback,
496            } => {
497                let create_application_result = self
498                    .state
499                    .system
500                    .create_application(
501                        chain_id,
502                        block_height,
503                        module_id,
504                        parameters,
505                        required_application_ids,
506                        self.txn_tracker,
507                    )
508                    .await?;
509                callback.respond(create_application_result);
510            }
511
512            PerformHttpRequest {
513                request,
514                http_responses_are_oracle_responses,
515                callback,
516            } => {
517                let system = &mut self.state.system;
518                let response = self
519                    .txn_tracker
520                    .oracle(|| async {
521                        let headers = request
522                            .headers
523                            .into_iter()
524                            .map(|http::Header { name, value }| {
525                                Ok((name.parse()?, value.try_into()?))
526                            })
527                            .collect::<Result<HeaderMap, ExecutionError>>()?;
528
529                        let url = Url::parse(&request.url)?;
530                        let host = url
531                            .host_str()
532                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
533
534                        let (_epoch, committee) = system
535                            .current_committee()
536                            .await?
537                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
538                        let allowed_hosts = &committee.policy().http_request_allow_list;
539
540                        ensure!(
541                            allowed_hosts.contains(host),
542                            ExecutionError::UnauthorizedHttpRequest(url)
543                        );
544
545                        let request = Client::new()
546                            .request(request.method.into(), url)
547                            .body(request.body)
548                            .headers(headers);
549                        #[cfg(not(web))]
550                        let request = request.timeout(linera_base::time::Duration::from_millis(
551                            committee.policy().http_request_timeout_ms,
552                        ));
553
554                        let response = request.send().await?;
555
556                        let mut response_size_limit =
557                            committee.policy().maximum_http_response_bytes;
558
559                        if http_responses_are_oracle_responses {
560                            response_size_limit = response_size_limit
561                                .min(committee.policy().maximum_oracle_response_bytes);
562                        }
563                        Ok(OracleResponse::Http(
564                            Self::receive_http_response(response, response_size_limit).await?,
565                        ))
566                    })
567                    .await?
568                    .to_http_response()?;
569                callback.respond(response);
570            }
571
572            ReadBlobContent { blob_id, callback } => {
573                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
574                    content.clone()
575                } else {
576                    let content = self.state.system.read_blob_content(blob_id).await?;
577                    if blob_id.blob_type == BlobType::Data {
578                        self.resource_controller
579                            .with_state(&mut self.state.system)
580                            .await?
581                            .track_blob_read(content.bytes().len() as u64)?;
582                    }
583                    self.state
584                        .system
585                        .blob_used(self.txn_tracker, blob_id)
586                        .await?;
587                    content
588                };
589                callback.respond(content)
590            }
591
592            AssertBlobExists { blob_id, callback } => {
593                self.state.system.assert_blob_exists(blob_id).await?;
594                // Treating this as reading a size-0 blob for fee purposes.
595                if blob_id.blob_type == BlobType::Data {
596                    self.resource_controller
597                        .with_state(&mut self.state.system)
598                        .await?
599                        .track_blob_read(0)?;
600                }
601                let is_new = self
602                    .state
603                    .system
604                    .blob_used(self.txn_tracker, blob_id)
605                    .await?;
606                if is_new {
607                    self.txn_tracker
608                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
609                }
610                callback.respond(());
611            }
612
613            Emit {
614                stream_id,
615                value,
616                callback,
617            } => {
618                let count = self
619                    .state
620                    .system
621                    .stream_event_counts
622                    .get_mut_or_default(&stream_id)
623                    .await?;
624                let index = *count;
625                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
626                self.resource_controller
627                    .with_state(&mut self.state.system)
628                    .await?
629                    .track_event_published(&value)?;
630                self.txn_tracker.add_event(stream_id, index, value);
631                callback.respond(index)
632            }
633
634            ReadEvent { event_id, callback } => {
635                let context = self.state.context();
636                let extra = context.extra();
637                let event = self
638                    .txn_tracker
639                    .oracle(|| async {
640                        let event = extra
641                            .get_event(event_id.clone())
642                            .await?
643                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
644                        Ok(OracleResponse::Event(
645                            event_id.clone(),
646                            Arc::unwrap_or_clone(event),
647                        ))
648                    })
649                    .await?
650                    .to_event(&event_id)?;
651                self.resource_controller
652                    .with_state(&mut self.state.system)
653                    .await?
654                    .track_event_read(event.len() as u64)?;
655                callback.respond(event);
656            }
657
658            SubscribeToEvents {
659                chain_id,
660                stream_id,
661                subscriber_app_id,
662                callback,
663            } => {
664                let subscriptions = self
665                    .state
666                    .system
667                    .event_subscriptions
668                    .get_mut_or_default(&(chain_id, stream_id.clone()))
669                    .await?;
670                let next_index = match subscriptions.applications.entry(subscriber_app_id) {
671                    std::collections::btree_map::Entry::Vacant(entry) => {
672                        entry.insert(0);
673                        subscriptions.min_next_index = 0;
674                        0
675                    }
676                    std::collections::btree_map::Entry::Occupied(entry) => *entry.get(),
677                };
678                self.txn_tracker.add_stream_to_process(
679                    subscriber_app_id,
680                    chain_id,
681                    stream_id,
682                    0,
683                    next_index,
684                );
685                callback.respond(());
686            }
687
688            UnsubscribeFromEvents {
689                chain_id,
690                stream_id,
691                subscriber_app_id,
692                callback,
693            } => {
694                let key = (chain_id, stream_id.clone());
695                let subscriptions = self
696                    .state
697                    .system
698                    .event_subscriptions
699                    .get_mut_or_default(&key)
700                    .await?;
701                subscriptions.applications.remove(&subscriber_app_id);
702                if subscriptions.applications.is_empty() {
703                    self.state.system.event_subscriptions.remove(&key)?;
704                } else {
705                    subscriptions.recalculate_min();
706                }
707                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
708                    self.txn_tracker
709                        .remove_stream_to_process(app_id, chain_id, stream_id);
710                }
711                callback.respond(());
712            }
713
714            GetApplicationPermissions { callback } => {
715                let app_permissions = self.state.system.application_permissions.get().await?;
716                callback.respond(app_permissions.clone());
717            }
718
719            QueryServiceOracle {
720                deadline,
721                application_id,
722                next_block_height,
723                query,
724                callback,
725            } => {
726                let state = &mut self.state;
727                let local_time = self.txn_tracker.local_time();
728                let created_blobs = self.txn_tracker.created_blobs().clone();
729                let bytes = self
730                    .txn_tracker
731                    .oracle(|| async {
732                        let context = QueryContext {
733                            chain_id: state.context().extra().chain_id(),
734                            next_block_height,
735                            local_time,
736                        };
737                        let QueryOutcome {
738                            response,
739                            operations,
740                        } = Box::pin(state.query_user_application_with_deadline(
741                            application_id,
742                            context,
743                            query,
744                            deadline,
745                            created_blobs,
746                        ))
747                        .await?;
748                        ensure!(
749                            operations.is_empty(),
750                            ExecutionError::ServiceOracleQueryOperations(operations)
751                        );
752                        Ok(OracleResponse::Service(response))
753                    })
754                    .await?
755                    .to_service_response()?;
756                callback.respond(bytes);
757            }
758
759            AddOutgoingMessage { message, callback } => {
760                self.txn_tracker.add_outgoing_message(message);
761                callback.respond(());
762            }
763
764            SetLocalTime {
765                local_time,
766                callback,
767            } => {
768                self.txn_tracker.set_local_time(local_time);
769                callback.respond(());
770            }
771
772            AssertBefore {
773                timestamp,
774                callback,
775            } => {
776                let result = if !self
777                    .txn_tracker
778                    .replay_oracle_response(OracleResponse::Assert)?
779                {
780                    // There are no recorded oracle responses, so we check the local time.
781                    let local_time = self.txn_tracker.local_time();
782                    if local_time >= timestamp {
783                        Err(ExecutionError::AssertBefore {
784                            timestamp,
785                            local_time,
786                        })
787                    } else {
788                        Ok(())
789                    }
790                } else {
791                    Ok(())
792                };
793                callback.respond(result);
794            }
795
796            AddCreatedBlob { blob, callback } => {
797                if self.resource_controller.is_free {
798                    self.txn_tracker.mark_blob_free(blob.id());
799                }
800                self.txn_tracker.add_created_blob(blob);
801                callback.respond(());
802            }
803
804            ValidationRound { round, callback } => {
805                let validation_round = self
806                    .txn_tracker
807                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
808                    .await?
809                    .to_round()?;
810                callback.respond(validation_round);
811            }
812
813            TotalStorageSize {
814                application,
815                callback,
816            } => {
817                let view = self.state.users.try_load_entry(&application).await?;
818                let result = match view {
819                    Some(view) => {
820                        let total_size = view.total_size();
821                        (total_size.key, total_size.value)
822                    }
823                    None => (0, 0),
824                };
825                callback.respond(result);
826            }
827
828            AllowApplicationLogs { callback } => {
829                let allow = self
830                    .state
831                    .context()
832                    .extra()
833                    .execution_runtime_config()
834                    .allow_application_logs;
835                callback.respond(allow);
836            }
837
838            #[cfg(web)]
839            Log { message, level } => match level {
840                tracing::log::Level::Trace | tracing::log::Level::Debug => {
841                    tracing::debug!(target: "user_application_log", message = %message);
842                }
843                tracing::log::Level::Info => {
844                    tracing::info!(target: "user_application_log", message = %message);
845                }
846                tracing::log::Level::Warn => {
847                    tracing::warn!(target: "user_application_log", message = %message);
848                }
849                tracing::log::Level::Error => {
850                    tracing::error!(target: "user_application_log", message = %message);
851                }
852            },
853        }
854
855        Ok(())
856    }
857
858    /// Calls `process_streams` for all applications that are subscribed to streams with new
859    /// events or that have new subscriptions.
860    #[instrument(skip_all)]
861    async fn process_subscriptions(
862        &mut self,
863        context: ProcessStreamsContext,
864    ) -> Result<(), ExecutionError> {
865        // Keep track of which streams we have already processed. This is to guard against
866        // applications unsubscribing and subscribing in the process_streams call itself.
867        let mut processed = BTreeSet::new();
868        loop {
869            let to_process = self
870                .txn_tracker
871                .take_streams_to_process()
872                .into_iter()
873                .filter_map(|(app_id, updates)| {
874                    let updates = updates
875                        .into_iter()
876                        .filter_map(|update| {
877                            if !processed.insert((
878                                app_id,
879                                update.chain_id,
880                                update.stream_id.clone(),
881                            )) {
882                                return None;
883                            }
884                            Some(update)
885                        })
886                        .collect::<Vec<_>>();
887                    if updates.is_empty() {
888                        return None;
889                    }
890                    Some((app_id, updates))
891                })
892                .collect::<BTreeMap<_, _>>();
893            if to_process.is_empty() {
894                return Ok(());
895            }
896            for (app_id, updates) in to_process {
897                self.run_user_action(
898                    app_id,
899                    UserAction::ProcessStreams(context, updates),
900                    None,
901                    None,
902                )
903                .await?;
904            }
905        }
906    }
907
908    pub(crate) async fn run_user_action(
909        &mut self,
910        application_id: ApplicationId,
911        action: UserAction,
912        refund_grant_to: Option<Account>,
913        grant: Option<&mut Amount>,
914    ) -> Result<(), ExecutionError> {
915        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
916            .await
917    }
918
919    // TODO(#5034): unify with `contract_and_dependencies`
920    pub(crate) async fn service_and_dependencies(
921        &mut self,
922        application: ApplicationId,
923    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
924        // cyclic futures are illegal so we need to either box the frames or keep our own
925        // stack
926        let mut stack = vec![application];
927        let mut codes = vec![];
928        let mut descriptions = vec![];
929
930        while let Some(id) = stack.pop() {
931            let (code, description) = self.load_service(id).await?;
932            stack.extend(description.required_application_ids.iter().rev().copied());
933            codes.push(code);
934            descriptions.push(description);
935        }
936
937        codes.reverse();
938        descriptions.reverse();
939
940        Ok((codes, descriptions))
941    }
942
943    // TODO(#5034): unify with `service_and_dependencies`
944    #[instrument(skip_all, fields(application_id = %application))]
945    async fn contract_and_dependencies(
946        &mut self,
947        application: ApplicationId,
948    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
949        // cyclic futures are illegal so we need to either box the frames or keep our own
950        // stack
951        let mut stack = vec![application];
952        let mut codes = vec![];
953        let mut descriptions = vec![];
954
955        while let Some(id) = stack.pop() {
956            let (code, description) = self.load_contract(id).await?;
957            stack.extend(description.required_application_ids.iter().rev().copied());
958            codes.push(code);
959            descriptions.push(description);
960        }
961
962        codes.reverse();
963        descriptions.reverse();
964
965        Ok((codes, descriptions))
966    }
967
968    #[instrument(skip_all, fields(application_id = %application_id))]
969    async fn run_user_action_with_runtime(
970        &mut self,
971        application_id: ApplicationId,
972        action: UserAction,
973        refund_grant_to: Option<Account>,
974        grant: Option<&mut Amount>,
975    ) -> Result<(), ExecutionError> {
976        let chain_id = self.state.context().extra().chain_id();
977        let mut cloned_grant = grant.as_ref().map(|x| **x);
978        let initial_balance = self
979            .resource_controller
980            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
981            .await?
982            .balance()?;
983        let mut controller = ResourceController::new(
984            self.resource_controller.policy().clone(),
985            self.resource_controller.tracker,
986            initial_balance,
987        );
988        let is_free = matches!(
989            &action,
990            UserAction::Message(..) | UserAction::ProcessStreams(..)
991        ) && self
992            .resource_controller
993            .policy()
994            .is_free_app(&application_id);
995        controller.is_free = is_free;
996        self.resource_controller.is_free = is_free;
997        let (execution_state_sender, mut execution_state_receiver) =
998            futures::channel::mpsc::unbounded();
999
1000        let (codes, descriptions): (Vec<_>, Vec<_>) =
1001            self.contract_and_dependencies(application_id).await?;
1002
1003        let allow_application_logs = self
1004            .state
1005            .context()
1006            .extra()
1007            .execution_runtime_config()
1008            .allow_application_logs;
1009
1010        let contract_runtime_task = self
1011            .state
1012            .context()
1013            .extra()
1014            .thread_pool()
1015            .run_send(JsVec(codes), move |codes| async move {
1016                let runtime = ContractSyncRuntime::new(
1017                    execution_state_sender,
1018                    chain_id,
1019                    refund_grant_to,
1020                    controller,
1021                    &action,
1022                    allow_application_logs,
1023                );
1024
1025                for (code, description) in codes.0.into_iter().zip(descriptions) {
1026                    runtime.preload_contract(ApplicationId::from(&description), code, description);
1027                }
1028
1029                runtime.run_action(application_id, chain_id, action)
1030            })
1031            .await;
1032
1033        async {
1034            while let Some(request) = execution_state_receiver.next().await {
1035                self.handle_request(request).await?;
1036            }
1037            Ok::<(), ExecutionError>(())
1038        }
1039        .instrument(info_span!("handle_runtime_requests"))
1040        .await?;
1041
1042        let (result, controller) = contract_runtime_task.await??;
1043
1044        self.resource_controller.is_free = false;
1045
1046        self.txn_tracker.add_operation_result(result);
1047
1048        self.resource_controller
1049            .with_state_and_grant(&mut self.state.system, grant)
1050            .await?
1051            .merge_balance(initial_balance, controller.balance()?)?;
1052        self.resource_controller.tracker = controller.tracker;
1053
1054        Ok(())
1055    }
1056
1057    #[instrument(skip_all, fields(
1058        chain_id = %context.chain_id,
1059        block_height = %context.height,
1060        operation_type = %operation.as_ref(),
1061    ))]
1062    pub async fn execute_operation(
1063        &mut self,
1064        context: OperationContext,
1065        operation: Operation,
1066    ) -> Result<(), ExecutionError> {
1067        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1068        match operation {
1069            Operation::System(op) => {
1070                let new_application = self
1071                    .state
1072                    .system
1073                    .execute_operation(context, *op, self.txn_tracker, self.resource_controller)
1074                    .await?;
1075                if let Some((application_id, argument)) = new_application {
1076                    let user_action = UserAction::Instantiate(context, argument);
1077                    self.run_user_action(
1078                        application_id,
1079                        user_action,
1080                        context.refund_grant_to(),
1081                        None,
1082                    )
1083                    .await?;
1084                }
1085            }
1086            Operation::User {
1087                application_id,
1088                bytes,
1089            } => {
1090                self.run_user_action(
1091                    application_id,
1092                    UserAction::Operation(context, bytes),
1093                    context.refund_grant_to(),
1094                    None,
1095                )
1096                .await?;
1097            }
1098        }
1099        self.process_subscriptions(context.into()).await?;
1100        Ok(())
1101    }
1102
1103    #[instrument(skip_all, fields(
1104        chain_id = %context.chain_id,
1105        block_height = %context.height,
1106        origin = %context.origin,
1107        is_bouncing = %context.is_bouncing,
1108        message_type = %message.as_ref(),
1109    ))]
1110    pub async fn execute_message(
1111        &mut self,
1112        context: MessageContext,
1113        message: Message,
1114        grant: Option<&mut Amount>,
1115    ) -> Result<(), ExecutionError> {
1116        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1117        match message {
1118            Message::System(message) => {
1119                let outcome = self.state.system.execute_message(context, message).await?;
1120                self.txn_tracker.add_outgoing_messages(outcome);
1121            }
1122            Message::User {
1123                application_id,
1124                bytes,
1125            } => {
1126                self.run_user_action(
1127                    application_id,
1128                    UserAction::Message(context, bytes),
1129                    context.refund_grant_to,
1130                    grant,
1131                )
1132                .await?;
1133            }
1134        }
1135        self.process_subscriptions(context.into()).await?;
1136        Ok(())
1137    }
1138
1139    pub fn bounce_message(
1140        &mut self,
1141        context: MessageContext,
1142        grant: Amount,
1143        message: Message,
1144    ) -> Result<(), ExecutionError> {
1145        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1146        self.txn_tracker.add_outgoing_message(OutgoingMessage {
1147            destination: context.origin,
1148            authenticated_owner: context.authenticated_owner,
1149            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1150            grant,
1151            kind: MessageKind::Bouncing,
1152            message,
1153        });
1154        Ok(())
1155    }
1156
1157    pub fn send_refund(
1158        &mut self,
1159        context: MessageContext,
1160        amount: Amount,
1161    ) -> Result<(), ExecutionError> {
1162        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1163        if amount.is_zero() {
1164            return Ok(());
1165        }
1166        let Some(account) = context.refund_grant_to else {
1167            return Err(ExecutionError::InternalError(
1168                "Messages with grants should have a non-empty `refund_grant_to`",
1169            ));
1170        };
1171        let message = SystemMessage::Credit {
1172            amount,
1173            source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
1174            target: account.owner,
1175        };
1176        self.txn_tracker.add_outgoing_message(
1177            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1178        );
1179        Ok(())
1180    }
1181
1182    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
1183    ///
1184    /// Ensures that the response does not exceed the provided `size_limit`.
1185    async fn receive_http_response(
1186        response: reqwest::Response,
1187        size_limit: u64,
1188    ) -> Result<http::Response, ExecutionError> {
1189        let status = response.status().as_u16();
1190        let maybe_content_length = response.content_length();
1191
1192        let headers = response
1193            .headers()
1194            .iter()
1195            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1196            .collect::<Vec<_>>();
1197
1198        let total_header_size = headers
1199            .iter()
1200            .map(|header| (header.name.len() + header.value.len()) as u64)
1201            .sum();
1202
1203        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1204            ExecutionError::HttpResponseSizeLimitExceeded {
1205                limit: size_limit,
1206                size: total_header_size,
1207            },
1208        )?;
1209
1210        if let Some(content_length) = maybe_content_length {
1211            if content_length > remaining_bytes {
1212                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1213                    limit: size_limit,
1214                    size: content_length + total_header_size,
1215                });
1216            }
1217        }
1218
1219        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
1220        let mut body_stream = response.bytes_stream();
1221
1222        while let Some(bytes) = body_stream.next().await.transpose()? {
1223            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1224                ExecutionError::HttpResponseSizeLimitExceeded {
1225                    limit: size_limit,
1226                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1227                },
1228            )?;
1229
1230            body.extend(&bytes);
1231        }
1232
1233        Ok(http::Response {
1234            status,
1235            headers,
1236            body,
1237        })
1238    }
1239}
1240
1241/// Requests to the execution state.
1242#[derive(Debug, strum::AsRefStr)]
1243pub enum ExecutionRequest {
1244    #[cfg(not(web))]
1245    LoadContract {
1246        id: ApplicationId,
1247        #[debug(skip)]
1248        callback: Sender<(UserContractCode, ApplicationDescription)>,
1249    },
1250
1251    #[cfg(not(web))]
1252    LoadService {
1253        id: ApplicationId,
1254        #[debug(skip)]
1255        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1256    },
1257
1258    ChainBalance {
1259        #[debug(skip)]
1260        callback: Sender<Amount>,
1261    },
1262
1263    OwnerBalance {
1264        owner: AccountOwner,
1265        #[debug(skip)]
1266        callback: Sender<Amount>,
1267    },
1268
1269    OwnerBalances {
1270        #[debug(skip)]
1271        callback: Sender<Vec<(AccountOwner, Amount)>>,
1272    },
1273
1274    BalanceOwners {
1275        #[debug(skip)]
1276        callback: Sender<Vec<AccountOwner>>,
1277    },
1278
1279    Allowance {
1280        owner: AccountOwner,
1281        spender: AccountOwner,
1282        #[debug(skip)]
1283        callback: Sender<Amount>,
1284    },
1285
1286    Allowances {
1287        #[debug(skip)]
1288        callback: Sender<Vec<(AccountOwner, AccountOwner, Amount)>>,
1289    },
1290
1291    Transfer {
1292        source: AccountOwner,
1293        destination: Account,
1294        amount: Amount,
1295        #[debug(skip_if = Option::is_none)]
1296        signer: Option<AccountOwner>,
1297        application_id: ApplicationId,
1298        #[debug(skip)]
1299        callback: Sender<()>,
1300    },
1301
1302    Claim {
1303        source: Account,
1304        destination: Account,
1305        amount: Amount,
1306        #[debug(skip_if = Option::is_none)]
1307        signer: Option<AccountOwner>,
1308        application_id: ApplicationId,
1309        #[debug(skip)]
1310        callback: Sender<()>,
1311    },
1312
1313    Approve {
1314        owner: AccountOwner,
1315        spender: AccountOwner,
1316        amount: Amount,
1317        #[debug(skip_if = Option::is_none)]
1318        signer: Option<AccountOwner>,
1319        application_id: ApplicationId,
1320        #[debug(skip)]
1321        callback: Sender<()>,
1322    },
1323
1324    TransferFrom {
1325        owner: AccountOwner,
1326        spender: AccountOwner,
1327        destination: Account,
1328        amount: Amount,
1329        #[debug(skip_if = Option::is_none)]
1330        signer: Option<AccountOwner>,
1331        application_id: ApplicationId,
1332        #[debug(skip)]
1333        callback: Sender<()>,
1334    },
1335
1336    SystemTimestamp {
1337        #[debug(skip)]
1338        callback: Sender<Timestamp>,
1339    },
1340
1341    ChainOwnership {
1342        #[debug(skip)]
1343        callback: Sender<ChainOwnership>,
1344    },
1345
1346    ApplicationPermissions {
1347        #[debug(skip)]
1348        callback: Sender<ApplicationPermissions>,
1349    },
1350
1351    ReadApplicationDescription {
1352        application_id: ApplicationId,
1353        #[debug(skip)]
1354        callback: Sender<ApplicationDescription>,
1355    },
1356
1357    ReadValueBytes {
1358        id: ApplicationId,
1359        #[debug(with = hex_debug)]
1360        key: Vec<u8>,
1361        #[debug(skip)]
1362        callback: Sender<Option<Vec<u8>>>,
1363    },
1364
1365    ContainsKey {
1366        id: ApplicationId,
1367        key: Vec<u8>,
1368        #[debug(skip)]
1369        callback: Sender<bool>,
1370    },
1371
1372    ContainsKeys {
1373        id: ApplicationId,
1374        #[debug(with = hex_vec_debug)]
1375        keys: Vec<Vec<u8>>,
1376        callback: Sender<Vec<bool>>,
1377    },
1378
1379    ReadMultiValuesBytes {
1380        id: ApplicationId,
1381        #[debug(with = hex_vec_debug)]
1382        keys: Vec<Vec<u8>>,
1383        #[debug(skip)]
1384        callback: Sender<Vec<Option<Vec<u8>>>>,
1385    },
1386
1387    FindKeysByPrefix {
1388        id: ApplicationId,
1389        #[debug(with = hex_debug)]
1390        key_prefix: Vec<u8>,
1391        #[debug(skip)]
1392        callback: Sender<Vec<Vec<u8>>>,
1393    },
1394
1395    FindKeyValuesByPrefix {
1396        id: ApplicationId,
1397        #[debug(with = hex_debug)]
1398        key_prefix: Vec<u8>,
1399        #[debug(skip)]
1400        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1401    },
1402
1403    WriteBatch {
1404        id: ApplicationId,
1405        batch: Batch,
1406        #[debug(skip)]
1407        callback: Sender<()>,
1408    },
1409
1410    OpenChain {
1411        ownership: ChainOwnership,
1412        #[debug(skip_if = Amount::is_zero)]
1413        balance: Amount,
1414        parent_id: ChainId,
1415        block_height: BlockHeight,
1416        application_permissions: ApplicationPermissions,
1417        timestamp: Timestamp,
1418        #[debug(skip)]
1419        callback: Sender<ChainId>,
1420    },
1421
1422    CloseChain {
1423        application_id: ApplicationId,
1424        #[debug(skip)]
1425        callback: Sender<Result<(), ExecutionError>>,
1426    },
1427
1428    ChangeOwnership {
1429        application_id: ApplicationId,
1430        ownership: ChainOwnership,
1431        #[debug(skip)]
1432        callback: Sender<Result<(), ExecutionError>>,
1433    },
1434
1435    ChangeApplicationPermissions {
1436        application_id: ApplicationId,
1437        application_permissions: ApplicationPermissions,
1438        #[debug(skip)]
1439        callback: Sender<Result<(), ExecutionError>>,
1440    },
1441
1442    PeekApplicationIndex {
1443        #[debug(skip)]
1444        callback: Sender<u32>,
1445    },
1446
1447    CreateApplication {
1448        chain_id: ChainId,
1449        block_height: BlockHeight,
1450        module_id: ModuleId,
1451        parameters: Vec<u8>,
1452        required_application_ids: Vec<ApplicationId>,
1453        #[debug(skip)]
1454        callback: Sender<CreateApplicationResult>,
1455    },
1456
1457    PerformHttpRequest {
1458        request: http::Request,
1459        http_responses_are_oracle_responses: bool,
1460        #[debug(skip)]
1461        callback: Sender<http::Response>,
1462    },
1463
1464    ReadBlobContent {
1465        blob_id: BlobId,
1466        #[debug(skip)]
1467        callback: Sender<BlobContent>,
1468    },
1469
1470    AssertBlobExists {
1471        blob_id: BlobId,
1472        #[debug(skip)]
1473        callback: Sender<()>,
1474    },
1475
1476    Emit {
1477        stream_id: StreamId,
1478        #[debug(with = hex_debug)]
1479        value: Vec<u8>,
1480        #[debug(skip)]
1481        callback: Sender<u32>,
1482    },
1483
1484    ReadEvent {
1485        event_id: EventId,
1486        callback: oneshot::Sender<Vec<u8>>,
1487    },
1488
1489    SubscribeToEvents {
1490        chain_id: ChainId,
1491        stream_id: StreamId,
1492        subscriber_app_id: ApplicationId,
1493        #[debug(skip)]
1494        callback: Sender<()>,
1495    },
1496
1497    UnsubscribeFromEvents {
1498        chain_id: ChainId,
1499        stream_id: StreamId,
1500        subscriber_app_id: ApplicationId,
1501        #[debug(skip)]
1502        callback: Sender<()>,
1503    },
1504
1505    GetApplicationPermissions {
1506        #[debug(skip)]
1507        callback: Sender<ApplicationPermissions>,
1508    },
1509
1510    QueryServiceOracle {
1511        deadline: Option<Instant>,
1512        application_id: ApplicationId,
1513        next_block_height: BlockHeight,
1514        query: Vec<u8>,
1515        #[debug(skip)]
1516        callback: Sender<Vec<u8>>,
1517    },
1518
1519    AddOutgoingMessage {
1520        message: crate::OutgoingMessage,
1521        #[debug(skip)]
1522        callback: Sender<()>,
1523    },
1524
1525    SetLocalTime {
1526        local_time: Timestamp,
1527        #[debug(skip)]
1528        callback: Sender<()>,
1529    },
1530
1531    AssertBefore {
1532        timestamp: Timestamp,
1533        #[debug(skip)]
1534        callback: Sender<Result<(), ExecutionError>>,
1535    },
1536
1537    AddCreatedBlob {
1538        blob: crate::Blob,
1539        #[debug(skip)]
1540        callback: Sender<()>,
1541    },
1542
1543    ValidationRound {
1544        round: Option<u32>,
1545        #[debug(skip)]
1546        callback: Sender<Option<u32>>,
1547    },
1548
1549    TotalStorageSize {
1550        application: ApplicationId,
1551        #[debug(skip)]
1552        callback: Sender<(u32, u32)>,
1553    },
1554
1555    AllowApplicationLogs {
1556        #[debug(skip)]
1557        callback: Sender<bool>,
1558    },
1559
1560    /// Log message from contract execution (fire-and-forget, no callback needed).
1561    #[cfg(web)]
1562    Log {
1563        message: String,
1564        level: tracing::log::Level,
1565    },
1566}