Skip to main content

linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::{
7    collections::{BTreeMap, BTreeSet},
8    sync::Arc,
9};
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16    data_types::{
17        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, OracleResponse,
18        StreamUpdate, Timestamp,
19    },
20    ensure, hex_debug, hex_vec_debug, http,
21    identifiers::{
22        Account, AccountOwner, BlobId, BlobType, ChainId, EventId, GenericApplicationId,
23        OwnerSpender, StreamId,
24    },
25    ownership::ChainOwnership,
26    time::Instant,
27};
28use linera_views::{batch::Batch, context::Context, views::View};
29use oneshot::Sender;
30use reqwest::{header::HeaderMap, Client, Url};
31use tracing::{info_span, instrument, Instrument as _};
32
33use crate::{
34    execution::UserAction,
35    runtime::ContractSyncRuntime,
36    system::{CreateApplicationResult, OpenChainConfig},
37    util::{OracleResponseExt as _, RespondExt as _},
38    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
39    ExecutionStateView, JsVec, Message, MessageContext, MessageKind, ModuleId, Operation,
40    OperationContext, OutgoingMessage, ProcessStreamsContext, QueryContext, QueryOutcome,
41    ResourceController, SystemMessage, SystemOperation, TransactionTracker, UserContractCode,
42    UserServiceCode,
43};
44
45/// Actor for handling requests to the execution state.
46pub struct ExecutionStateActor<'a, C> {
47    state: &'a mut ExecutionStateView<C>,
48    txn_tracker: &'a mut TransactionTracker,
49    resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
50}
51
52#[cfg(with_metrics)]
53mod metrics {
54    use std::sync::LazyLock;
55
56    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
57    use prometheus::HistogramVec;
58
59    /// Histogram of the latency to load a contract bytecode.
60    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
61        register_histogram_vec(
62            "load_contract_latency",
63            "Load contract latency",
64            &[],
65            exponential_bucket_latencies(250.0),
66        )
67    });
68
69    /// Histogram of the latency to load a service bytecode.
70    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
71        register_histogram_vec(
72            "load_service_latency",
73            "Load service latency",
74            &[],
75            exponential_bucket_latencies(250.0),
76        )
77    });
78}
79
80pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
81
82impl<'a, C> ExecutionStateActor<'a, C>
83where
84    C: Context + Clone + 'static,
85    C::Extra: ExecutionRuntimeContext,
86{
87    /// Creates a new execution state actor.
88    pub fn new(
89        state: &'a mut ExecutionStateView<C>,
90        txn_tracker: &'a mut TransactionTracker,
91        resource_controller: &'a mut ResourceController<Option<AccountOwner>>,
92    ) -> Self {
93        Self {
94            state,
95            txn_tracker,
96            resource_controller,
97        }
98    }
99
100    #[instrument(skip_all, fields(application_id = %id))]
101    pub(crate) async fn load_contract(
102        &mut self,
103        id: ApplicationId,
104    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
105        #[cfg(with_metrics)]
106        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
107        let blob_id = id.description_blob_id();
108        let description = match self.txn_tracker.get_blob_content(&blob_id) {
109            Some(blob) => bcs::from_bytes(blob.bytes())?,
110            None => {
111                self.state
112                    .system
113                    .describe_application(id, self.txn_tracker)
114                    .await?
115            }
116        };
117        let code = self
118            .state
119            .context()
120            .extra()
121            .get_user_contract(&description, self.txn_tracker)
122            .await?;
123        Ok((code, description))
124    }
125
126    pub(crate) async fn load_service(
127        &mut self,
128        id: ApplicationId,
129    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
130        #[cfg(with_metrics)]
131        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
132        let blob_id = id.description_blob_id();
133        let description = match self.txn_tracker.get_blob_content(&blob_id) {
134            Some(blob) => bcs::from_bytes(blob.bytes())?,
135            None => {
136                self.state
137                    .system
138                    .describe_application(id, self.txn_tracker)
139                    .await?
140            }
141        };
142        let code = self
143            .state
144            .context()
145            .extra()
146            .get_user_service(&description, self.txn_tracker)
147            .await?;
148        Ok((code, description))
149    }
150
151    // TODO(#1416): Support concurrent I/O.
152    #[instrument(
153        skip_all,
154        fields(request_type = %request.as_ref())
155    )]
156    pub(crate) async fn handle_request(
157        &mut self,
158        request: ExecutionRequest,
159    ) -> Result<(), ExecutionError> {
160        use ExecutionRequest::*;
161        match request {
162            #[cfg(not(web))]
163            LoadContract { id, callback } => {
164                let (code, description) = self.load_contract(id).await?;
165                callback.respond((code, description))
166            }
167            #[cfg(not(web))]
168            LoadService { id, callback } => {
169                let (code, description) = self.load_service(id).await?;
170                callback.respond((code, description))
171            }
172
173            ChainBalance { callback } => {
174                let balance = *self.state.system.balance.get();
175                callback.respond(balance);
176            }
177
178            OwnerBalance { owner, callback } => {
179                let balance = self
180                    .state
181                    .system
182                    .balances
183                    .get(&owner)
184                    .await?
185                    .unwrap_or_default();
186                callback.respond(balance);
187            }
188
189            OwnerBalances { callback } => {
190                callback.respond(self.state.system.balances.index_values().await?);
191            }
192
193            BalanceOwners { callback } => {
194                let owners = self.state.system.balances.indices().await?;
195                callback.respond(owners);
196            }
197
198            Allowance {
199                owner,
200                spender,
201                callback,
202            } => {
203                let owner_spender = OwnerSpender::new(owner, spender);
204                let allowance = self
205                    .state
206                    .system
207                    .allowances
208                    .get(&owner_spender)
209                    .await?
210                    .unwrap_or_default();
211                callback.respond(allowance);
212            }
213
214            Allowances { callback } => {
215                let entries: Vec<_> = self
216                    .state
217                    .system
218                    .allowances
219                    .index_values()
220                    .await?
221                    .into_iter()
222                    .map(|(os, amount)| (os.owner, os.spender, amount))
223                    .collect();
224                callback.respond(entries);
225            }
226
227            Transfer {
228                source,
229                destination,
230                amount,
231                signer,
232                application_id,
233                callback,
234            } => {
235                let maybe_message = self
236                    .state
237                    .system
238                    .transfer(signer, Some(application_id), source, destination, amount)
239                    .await?;
240                self.txn_tracker.add_outgoing_messages(maybe_message);
241                callback.respond(());
242            }
243
244            Claim {
245                source,
246                destination,
247                amount,
248                signer,
249                application_id,
250                callback,
251            } => {
252                let maybe_message = self
253                    .state
254                    .system
255                    .claim(
256                        signer,
257                        Some(application_id),
258                        source.owner,
259                        source.chain_id,
260                        destination,
261                        amount,
262                    )
263                    .await?;
264                self.txn_tracker.add_outgoing_messages(maybe_message);
265                callback.respond(());
266            }
267
268            Approve {
269                owner,
270                spender,
271                amount,
272                signer,
273                application_id,
274                callback,
275            } => {
276                self.state
277                    .system
278                    .approve(signer, Some(application_id), owner, spender, amount)
279                    .await?;
280                callback.respond(());
281            }
282
283            TransferFrom {
284                owner,
285                spender,
286                destination,
287                amount,
288                signer,
289                application_id,
290                callback,
291            } => {
292                let maybe_message = self
293                    .state
294                    .system
295                    .transfer_from(
296                        signer,
297                        Some(application_id),
298                        owner,
299                        spender,
300                        destination,
301                        amount,
302                    )
303                    .await?;
304                self.txn_tracker.add_outgoing_messages(maybe_message);
305                callback.respond(());
306            }
307
308            SystemTimestamp { callback } => {
309                let timestamp = *self.state.system.timestamp.get();
310                callback.respond(timestamp);
311            }
312
313            ChainOwnership { callback } => {
314                let ownership = self.state.system.ownership.get().await?.clone();
315                callback.respond(ownership);
316            }
317
318            ApplicationPermissions { callback } => {
319                let permissions = self
320                    .state
321                    .system
322                    .application_permissions
323                    .get()
324                    .await?
325                    .clone();
326                callback.respond(permissions);
327            }
328
329            ReadApplicationDescription {
330                application_id,
331                callback,
332            } => {
333                let blob_id = application_id.description_blob_id();
334                let description = match self.txn_tracker.get_blob_content(&blob_id) {
335                    Some(blob) => bcs::from_bytes(blob.bytes())?,
336                    None => {
337                        let blob_content = self.state.system.read_blob_content(blob_id).await?;
338                        self.state
339                            .system
340                            .blob_used(self.txn_tracker, blob_id)
341                            .await?;
342                        bcs::from_bytes(blob_content.bytes())?
343                    }
344                };
345                callback.respond(description);
346            }
347
348            ContainsKey { id, key, callback } => {
349                let view = self.state.users.try_load_entry(&id).await?;
350                let result = match view {
351                    Some(view) => view.contains_key(&key).await?,
352                    None => false,
353                };
354                callback.respond(result);
355            }
356
357            ContainsKeys { id, keys, callback } => {
358                let view = self.state.users.try_load_entry(&id).await?;
359                let result = match view {
360                    Some(view) => view.contains_keys(&keys).await?,
361                    None => vec![false; keys.len()],
362                };
363                callback.respond(result);
364            }
365
366            ReadMultiValuesBytes { id, keys, callback } => {
367                let view = self.state.users.try_load_entry(&id).await?;
368                let values = match view {
369                    Some(view) => view.multi_get(&keys).await?,
370                    None => vec![None; keys.len()],
371                };
372                callback.respond(values);
373            }
374
375            ReadValueBytes { id, key, callback } => {
376                let view = self.state.users.try_load_entry(&id).await?;
377                let result = match view {
378                    Some(view) => view.get(&key).await?,
379                    None => None,
380                };
381                callback.respond(result);
382            }
383
384            FindKeysByPrefix {
385                id,
386                key_prefix,
387                callback,
388            } => {
389                let view = self.state.users.try_load_entry(&id).await?;
390                let result = match view {
391                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
392                    None => Vec::new(),
393                };
394                callback.respond(result);
395            }
396
397            FindKeyValuesByPrefix {
398                id,
399                key_prefix,
400                callback,
401            } => {
402                let view = self.state.users.try_load_entry(&id).await?;
403                let result = match view {
404                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
405                    None => Vec::new(),
406                };
407                callback.respond(result);
408            }
409
410            WriteBatch {
411                id,
412                batch,
413                callback,
414            } => {
415                let mut view = self.state.users.try_load_entry_mut(&id).await?;
416                view.write_batch(batch)?;
417                callback.respond(());
418            }
419
420            OpenChain {
421                ownership,
422                balance,
423                parent_id,
424                block_height,
425                application_permissions,
426                timestamp,
427                callback,
428            } => {
429                let config = OpenChainConfig {
430                    ownership,
431                    balance,
432                    application_permissions,
433                };
434                let chain_id = self
435                    .state
436                    .system
437                    .open_chain(config, parent_id, block_height, timestamp, self.txn_tracker)
438                    .await?;
439                callback.respond(chain_id);
440            }
441
442            CloseChain {
443                application_id,
444                callback,
445            } => {
446                let app_permissions = self.state.system.application_permissions.get().await?;
447                if !app_permissions.can_manage_chain(&application_id) {
448                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
449                } else {
450                    self.state.system.close_chain();
451                    callback.respond(Ok(()));
452                }
453            }
454
455            ChangeOwnership {
456                application_id,
457                ownership,
458                callback,
459            } => {
460                let app_permissions = self.state.system.application_permissions.get().await?;
461                if !app_permissions.can_manage_chain(&application_id) {
462                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
463                } else {
464                    self.state.system.ownership.set(ownership);
465                    callback.respond(Ok(()));
466                }
467            }
468
469            ChangeApplicationPermissions {
470                application_id,
471                application_permissions,
472                callback,
473            } => {
474                let app_permissions = self.state.system.application_permissions.get().await?;
475                if !app_permissions.can_manage_chain(&application_id) {
476                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
477                } else {
478                    self.state
479                        .system
480                        .application_permissions
481                        .set(application_permissions);
482                    callback.respond(Ok(()));
483                }
484            }
485
486            PeekApplicationIndex { callback } => {
487                let index = self.txn_tracker.peek_application_index();
488                callback.respond(index)
489            }
490
491            CreateApplication {
492                chain_id,
493                block_height,
494                module_id,
495                parameters,
496                required_application_ids,
497                callback,
498            } => {
499                let create_application_result = self
500                    .state
501                    .system
502                    .create_application(
503                        chain_id,
504                        block_height,
505                        module_id,
506                        parameters,
507                        required_application_ids,
508                        self.txn_tracker,
509                    )
510                    .await?;
511                callback.respond(create_application_result);
512            }
513
514            PerformHttpRequest {
515                request,
516                http_responses_are_oracle_responses,
517                callback,
518            } => {
519                let system = &mut self.state.system;
520                let response = self
521                    .txn_tracker
522                    .oracle(|| async {
523                        let headers = request
524                            .headers
525                            .into_iter()
526                            .map(|http::Header { name, value }| {
527                                Ok((name.parse()?, value.try_into()?))
528                            })
529                            .collect::<Result<HeaderMap, ExecutionError>>()?;
530
531                        let url = Url::parse(&request.url)?;
532                        let host = url
533                            .host_str()
534                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
535
536                        let (_epoch, committee) = system
537                            .current_committee()
538                            .await?
539                            .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
540                        let allowed_hosts = &committee.policy().http_request_allow_list;
541
542                        ensure!(
543                            allowed_hosts.contains(host),
544                            ExecutionError::UnauthorizedHttpRequest(url)
545                        );
546
547                        let request = Client::new()
548                            .request(request.method.into(), url)
549                            .body(request.body)
550                            .headers(headers);
551                        #[cfg(not(web))]
552                        let request = request.timeout(linera_base::time::Duration::from_millis(
553                            committee.policy().http_request_timeout_ms,
554                        ));
555
556                        let response = request.send().await?;
557
558                        let mut response_size_limit =
559                            committee.policy().maximum_http_response_bytes;
560
561                        if http_responses_are_oracle_responses {
562                            response_size_limit = response_size_limit
563                                .min(committee.policy().maximum_oracle_response_bytes);
564                        }
565                        Ok(OracleResponse::Http(
566                            Self::receive_http_response(response, response_size_limit).await?,
567                        ))
568                    })
569                    .await?
570                    .to_http_response()?;
571                callback.respond(response);
572            }
573
574            ReadBlobContent { blob_id, callback } => {
575                let content = if let Some(content) = self.txn_tracker.get_blob_content(&blob_id) {
576                    content.clone()
577                } else {
578                    let content = self.state.system.read_blob_content(blob_id).await?;
579                    if blob_id.blob_type == BlobType::Data {
580                        self.resource_controller
581                            .with_state(&mut self.state.system)
582                            .await?
583                            .track_blob_read(content.bytes().len() as u64)?;
584                    }
585                    self.state
586                        .system
587                        .blob_used(self.txn_tracker, blob_id)
588                        .await?;
589                    content
590                };
591                callback.respond(content)
592            }
593
594            AssertBlobExists { blob_id, callback } => {
595                self.state.system.assert_blob_exists(blob_id).await?;
596                // Treating this as reading a size-0 blob for fee purposes.
597                if blob_id.blob_type == BlobType::Data {
598                    self.resource_controller
599                        .with_state(&mut self.state.system)
600                        .await?
601                        .track_blob_read(0)?;
602                }
603                let is_new = self
604                    .state
605                    .system
606                    .blob_used(self.txn_tracker, blob_id)
607                    .await?;
608                if is_new {
609                    self.txn_tracker
610                        .replay_oracle_response(OracleResponse::Blob(blob_id))?;
611                }
612                callback.respond(());
613            }
614
615            Emit {
616                stream_id,
617                value,
618                callback,
619            } => {
620                let count = self
621                    .state
622                    .system
623                    .stream_event_counts
624                    .get_mut_or_default(&stream_id)
625                    .await?;
626                let index = *count;
627                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
628                self.resource_controller
629                    .with_state(&mut self.state.system)
630                    .await?
631                    .track_event_published(&value)?;
632                self.txn_tracker.add_event(stream_id, index, value);
633                callback.respond(index)
634            }
635
636            ReadEvent { event_id, callback } => {
637                let context = self.state.context();
638                let extra = context.extra();
639                let event = self
640                    .txn_tracker
641                    .oracle(|| async {
642                        let event = extra
643                            .get_event(event_id.clone())
644                            .await?
645                            .ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
646                        Ok(OracleResponse::Event(
647                            event_id.clone(),
648                            Arc::unwrap_or_clone(event),
649                        ))
650                    })
651                    .await?
652                    .to_event(&event_id)?;
653                self.resource_controller
654                    .with_state(&mut self.state.system)
655                    .await?
656                    .track_event_read(event.len() as u64)?;
657                callback.respond(event);
658            }
659
660            SubscribeToEvents {
661                chain_id,
662                stream_id,
663                subscriber_app_id,
664                callback,
665            } => {
666                let subscriptions = self
667                    .state
668                    .system
669                    .event_subscriptions
670                    .get_mut_or_default(&(chain_id, stream_id.clone()))
671                    .await?;
672                let next_index = match subscriptions.applications.entry(subscriber_app_id) {
673                    std::collections::btree_map::Entry::Vacant(entry) => {
674                        entry.insert(0);
675                        subscriptions.min_next_index = 0;
676                        0
677                    }
678                    std::collections::btree_map::Entry::Occupied(entry) => *entry.get(),
679                };
680                self.txn_tracker.add_stream_to_process(
681                    subscriber_app_id,
682                    chain_id,
683                    stream_id,
684                    0,
685                    next_index,
686                );
687                callback.respond(());
688            }
689
690            UnsubscribeFromEvents {
691                chain_id,
692                stream_id,
693                subscriber_app_id,
694                callback,
695            } => {
696                let key = (chain_id, stream_id.clone());
697                let subscriptions = self
698                    .state
699                    .system
700                    .event_subscriptions
701                    .get_mut_or_default(&key)
702                    .await?;
703                subscriptions.applications.remove(&subscriber_app_id);
704                if subscriptions.applications.is_empty() {
705                    self.state.system.event_subscriptions.remove(&key)?;
706                } else {
707                    subscriptions.recalculate_min();
708                }
709                if let crate::GenericApplicationId::User(app_id) = stream_id.application_id {
710                    self.txn_tracker
711                        .remove_stream_to_process(app_id, chain_id, stream_id);
712                }
713                callback.respond(());
714            }
715
716            GetApplicationPermissions { callback } => {
717                let app_permissions = self.state.system.application_permissions.get().await?;
718                callback.respond(app_permissions.clone());
719            }
720
721            QueryServiceOracle {
722                deadline,
723                application_id,
724                next_block_height,
725                query,
726                callback,
727            } => {
728                let state = &mut self.state;
729                let local_time = self.txn_tracker.local_time();
730                let created_blobs = self.txn_tracker.created_blobs().clone();
731                let bytes = self
732                    .txn_tracker
733                    .oracle(|| async {
734                        let context = QueryContext {
735                            chain_id: state.context().extra().chain_id(),
736                            next_block_height,
737                            local_time,
738                        };
739                        let QueryOutcome {
740                            response,
741                            operations,
742                        } = Box::pin(state.query_user_application_with_deadline(
743                            application_id,
744                            context,
745                            query,
746                            deadline,
747                            created_blobs,
748                        ))
749                        .await?;
750                        ensure!(
751                            operations.is_empty(),
752                            ExecutionError::ServiceOracleQueryOperations(operations)
753                        );
754                        Ok(OracleResponse::Service(response))
755                    })
756                    .await?
757                    .to_service_response()?;
758                callback.respond(bytes);
759            }
760
761            AddOutgoingMessage { message, callback } => {
762                self.txn_tracker.add_outgoing_message(message);
763                callback.respond(());
764            }
765
766            SetLocalTime {
767                local_time,
768                callback,
769            } => {
770                self.txn_tracker.set_local_time(local_time);
771                callback.respond(());
772            }
773
774            AssertBefore {
775                timestamp,
776                callback,
777            } => {
778                let result = if !self
779                    .txn_tracker
780                    .replay_oracle_response(OracleResponse::Assert)?
781                {
782                    // There are no recorded oracle responses, so we check the local time.
783                    let local_time = self.txn_tracker.local_time();
784                    if local_time >= timestamp {
785                        Err(ExecutionError::AssertBefore {
786                            timestamp,
787                            local_time,
788                        })
789                    } else {
790                        Ok(())
791                    }
792                } else {
793                    Ok(())
794                };
795                callback.respond(result);
796            }
797
798            AddCreatedBlob { blob, callback } => {
799                if self.resource_controller.is_free {
800                    self.txn_tracker.mark_blob_free(blob.id());
801                }
802                self.txn_tracker.add_created_blob(blob);
803                callback.respond(());
804            }
805
806            ValidationRound { round, callback } => {
807                let validation_round = self
808                    .txn_tracker
809                    .oracle(|| async { Ok(OracleResponse::Round(round)) })
810                    .await?
811                    .to_round()?;
812                callback.respond(validation_round);
813            }
814
815            HasEmptyStorage {
816                application,
817                callback,
818            } => {
819                let view = self.state.users.try_load_entry(&application).await?;
820                let result = match view {
821                    Some(view) => view.count().await? == 0,
822                    None => true,
823                };
824                callback.respond(result);
825            }
826
827            AllowApplicationLogs { callback } => {
828                let allow = self
829                    .state
830                    .context()
831                    .extra()
832                    .execution_runtime_config()
833                    .allow_application_logs;
834                callback.respond(allow);
835            }
836
837            #[cfg(web)]
838            Log { message, level } => match level {
839                tracing::log::Level::Trace | tracing::log::Level::Debug => {
840                    tracing::debug!(target: "user_application_log", message = %message);
841                }
842                tracing::log::Level::Info => {
843                    tracing::info!(target: "user_application_log", message = %message);
844                }
845                tracing::log::Level::Warn => {
846                    tracing::warn!(target: "user_application_log", message = %message);
847                }
848                tracing::log::Level::Error => {
849                    tracing::error!(target: "user_application_log", message = %message);
850                }
851            },
852        }
853
854        Ok(())
855    }
856
857    /// Calls `process_streams` for all applications that are subscribed to streams with new
858    /// events or that have new subscriptions.
859    #[instrument(skip_all)]
860    async fn process_subscriptions(
861        &mut self,
862        context: ProcessStreamsContext,
863    ) -> Result<(), ExecutionError> {
864        // Keep track of which streams we have already processed. This is to guard against
865        // applications unsubscribing and subscribing in the process_streams call itself.
866        let mut processed = BTreeSet::new();
867        loop {
868            let to_process = self
869                .txn_tracker
870                .take_streams_to_process()
871                .into_iter()
872                .filter_map(|(app_id, updates)| {
873                    let updates = updates
874                        .into_iter()
875                        .filter_map(|update| {
876                            if !processed.insert((
877                                app_id,
878                                update.chain_id,
879                                update.stream_id.clone(),
880                            )) {
881                                return None;
882                            }
883                            Some(update)
884                        })
885                        .collect::<Vec<_>>();
886                    if updates.is_empty() {
887                        return None;
888                    }
889                    Some((app_id, updates))
890                })
891                .collect::<BTreeMap<_, _>>();
892            if to_process.is_empty() {
893                return Ok(());
894            }
895            for (app_id, updates) in to_process {
896                self.run_user_action(
897                    app_id,
898                    UserAction::ProcessStreams(context, updates),
899                    None,
900                    None,
901                )
902                .await?;
903            }
904        }
905    }
906
907    /// Calls `summarize_events` for every application that has published events to one of its
908    /// streams since the previous checkpoint, then drops those streams' pre-checkpoint anchors.
909    ///
910    /// The work list is exactly the set of user streams currently in `previous_event_blocks`:
911    /// the previous checkpoint cleared the map, so an entry means the stream published a
912    /// summary at (or any event since) the previous checkpoint. Each application may emit a
913    /// fresh summary event; the block-level event-stream bookkeeping then re-anchors that
914    /// stream to the checkpoint height (with no recertification link to the now-unguaranteed
915    /// older blocks, since the map was cleared here). A stream whose application emits nothing
916    /// stays dropped and is effectively closed: it won't be summarized again unless it
917    /// publishes new events.
918    async fn summarize_events_at_checkpoint(
919        &mut self,
920        context: OperationContext,
921    ) -> Result<(), ExecutionError> {
922        let mut updates_by_app = BTreeMap::<ApplicationId, Vec<StreamUpdate>>::new();
923        for stream_id in self.state.previous_event_blocks.indices().await? {
924            let GenericApplicationId::User(application_id) = stream_id.application_id else {
925                continue;
926            };
927            let next_index = self
928                .state
929                .system
930                .stream_event_counts
931                .get(&stream_id)
932                .await?
933                .unwrap_or(0);
934            // A summary is an absolute-state snapshot, so the application is not handed an
935            // incremental range to fold in; `previous_index` is left at 0.
936            updates_by_app
937                .entry(application_id)
938                .or_default()
939                .push(StreamUpdate {
940                    chain_id: context.chain_id,
941                    stream_id,
942                    previous_index: 0,
943                    next_index,
944                });
945        }
946
947        // Drop every pre-checkpoint anchor. Only user streams are present, since
948        // `prepare_checkpoint` rejects chains that published system events.
949        self.state.previous_event_blocks.clear();
950
951        let process_context = ProcessStreamsContext::from(context);
952        for (application_id, updates) in updates_by_app {
953            self.run_user_action(
954                application_id,
955                UserAction::SummarizeEvents(process_context, updates),
956                None,
957                None,
958            )
959            .await?;
960        }
961        Ok(())
962    }
963
964    pub(crate) async fn run_user_action(
965        &mut self,
966        application_id: ApplicationId,
967        action: UserAction,
968        refund_grant_to: Option<Account>,
969        grant: Option<&mut Amount>,
970    ) -> Result<(), ExecutionError> {
971        self.run_user_action_with_runtime(application_id, action, refund_grant_to, grant)
972            .await
973    }
974
975    // TODO(#5034): unify with `contract_and_dependencies`
976    pub(crate) async fn service_and_dependencies(
977        &mut self,
978        application: ApplicationId,
979    ) -> Result<(Vec<UserServiceCode>, Vec<ApplicationDescription>), ExecutionError> {
980        // cyclic futures are illegal so we need to either box the frames or keep our own
981        // stack
982        let mut stack = vec![application];
983        let mut codes = vec![];
984        let mut descriptions = vec![];
985
986        while let Some(id) = stack.pop() {
987            let (code, description) = self.load_service(id).await?;
988            stack.extend(description.required_application_ids.iter().rev().copied());
989            codes.push(code);
990            descriptions.push(description);
991        }
992
993        codes.reverse();
994        descriptions.reverse();
995
996        Ok((codes, descriptions))
997    }
998
999    // TODO(#5034): unify with `service_and_dependencies`
1000    #[instrument(skip_all, fields(application_id = %application))]
1001    async fn contract_and_dependencies(
1002        &mut self,
1003        application: ApplicationId,
1004    ) -> Result<(Vec<UserContractCode>, Vec<ApplicationDescription>), ExecutionError> {
1005        // cyclic futures are illegal so we need to either box the frames or keep our own
1006        // stack
1007        let mut stack = vec![application];
1008        let mut codes = vec![];
1009        let mut descriptions = vec![];
1010
1011        while let Some(id) = stack.pop() {
1012            let (code, description) = self.load_contract(id).await?;
1013            stack.extend(description.required_application_ids.iter().rev().copied());
1014            codes.push(code);
1015            descriptions.push(description);
1016        }
1017
1018        codes.reverse();
1019        descriptions.reverse();
1020
1021        Ok((codes, descriptions))
1022    }
1023
1024    #[instrument(skip_all, fields(application_id = %application_id))]
1025    async fn run_user_action_with_runtime(
1026        &mut self,
1027        application_id: ApplicationId,
1028        action: UserAction,
1029        refund_grant_to: Option<Account>,
1030        grant: Option<&mut Amount>,
1031    ) -> Result<(), ExecutionError> {
1032        let chain_id = self.state.context().extra().chain_id();
1033        let mut cloned_grant = grant.as_ref().map(|x| **x);
1034        let initial_balance = self
1035            .resource_controller
1036            .with_state_and_grant(&mut self.state.system, cloned_grant.as_mut())
1037            .await?
1038            .balance()?;
1039        let mut controller = ResourceController::new(
1040            self.resource_controller.policy().clone(),
1041            self.resource_controller.tracker,
1042            initial_balance,
1043        );
1044        let is_free = matches!(
1045            &action,
1046            UserAction::Message(..)
1047                | UserAction::ProcessStreams(..)
1048                | UserAction::SummarizeEvents(..)
1049        ) && self
1050            .resource_controller
1051            .policy()
1052            .is_free_app(&application_id);
1053        controller.is_free = is_free;
1054        self.resource_controller.is_free = is_free;
1055        let (execution_state_sender, mut execution_state_receiver) =
1056            futures::channel::mpsc::unbounded();
1057
1058        let (codes, descriptions): (Vec<_>, Vec<_>) =
1059            self.contract_and_dependencies(application_id).await?;
1060
1061        let allow_application_logs = self
1062            .state
1063            .context()
1064            .extra()
1065            .execution_runtime_config()
1066            .allow_application_logs;
1067
1068        let contract_runtime_task = self
1069            .state
1070            .context()
1071            .extra()
1072            .thread_pool()
1073            .run_send(JsVec(codes), move |codes| async move {
1074                let runtime = ContractSyncRuntime::new(
1075                    execution_state_sender,
1076                    chain_id,
1077                    refund_grant_to,
1078                    controller,
1079                    &action,
1080                    allow_application_logs,
1081                );
1082
1083                for (code, description) in codes.0.into_iter().zip(descriptions) {
1084                    runtime.preload_contract(ApplicationId::from(&description), code, description);
1085                }
1086
1087                runtime.run_action(application_id, chain_id, action)
1088            })
1089            .await;
1090
1091        async {
1092            while let Some(request) = execution_state_receiver.next().await {
1093                self.handle_request(request).await?;
1094            }
1095            Ok::<(), ExecutionError>(())
1096        }
1097        .instrument(info_span!("handle_runtime_requests"))
1098        .await?;
1099
1100        let (result, controller) = contract_runtime_task.await??;
1101
1102        self.resource_controller.is_free = false;
1103
1104        self.txn_tracker.add_operation_result(result);
1105
1106        self.resource_controller
1107            .with_state_and_grant(&mut self.state.system, grant)
1108            .await?
1109            .merge_balance(initial_balance, controller.balance()?)?;
1110        self.resource_controller.tracker = controller.tracker;
1111
1112        Ok(())
1113    }
1114
1115    #[instrument(skip_all, fields(
1116        chain_id = %context.chain_id,
1117        block_height = %context.height,
1118        operation_type = %operation.as_ref(),
1119    ))]
1120    /// Executes an operation, dispatching to the system or to a user application.
1121    pub async fn execute_operation(
1122        &mut self,
1123        context: OperationContext,
1124        operation: Operation,
1125    ) -> Result<(), ExecutionError> {
1126        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1127        match operation {
1128            Operation::System(op) => match *op {
1129                SystemOperation::Checkpoint => {
1130                    let prepared = self.txn_tracker.take_prepared_checkpoint().ok_or(
1131                        ExecutionError::CheckpointPreconditionFailed(
1132                            "Checkpoint operation reached the actor without prepared inputs; \
1133                             the chain-level pre-block hook must run prepare_checkpoint first",
1134                        ),
1135                    )?;
1136                    self.state
1137                        .apply_checkpoint(prepared, self.txn_tracker)
1138                        .await?;
1139                    self.summarize_events_at_checkpoint(context).await?;
1140                }
1141                op => {
1142                    let new_application = self
1143                        .state
1144                        .system
1145                        .execute_operation(context, op, self.txn_tracker, self.resource_controller)
1146                        .await?;
1147                    if let Some((application_id, argument)) = new_application {
1148                        let user_action = UserAction::Instantiate(context, argument);
1149                        self.run_user_action(
1150                            application_id,
1151                            user_action,
1152                            context.refund_grant_to(),
1153                            None,
1154                        )
1155                        .await?;
1156                    }
1157                }
1158            },
1159            Operation::User {
1160                application_id,
1161                bytes,
1162            } => {
1163                self.run_user_action(
1164                    application_id,
1165                    UserAction::Operation(context, bytes),
1166                    context.refund_grant_to(),
1167                    None,
1168                )
1169                .await?;
1170            }
1171        }
1172        self.process_subscriptions(context.into()).await?;
1173        Ok(())
1174    }
1175
1176    #[instrument(skip_all, fields(
1177        chain_id = %context.chain_id,
1178        block_height = %context.height,
1179        origin = %context.origin,
1180        is_bouncing = %context.is_bouncing,
1181        message_type = %message.as_ref(),
1182    ))]
1183    /// Executes an incoming message, dispatching to the system or to a user application.
1184    pub async fn execute_message(
1185        &mut self,
1186        context: MessageContext,
1187        message: Message,
1188        grant: Option<&mut Amount>,
1189    ) -> Result<(), ExecutionError> {
1190        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1191        match message {
1192            Message::System(message) => {
1193                let outcome = self.state.system.execute_message(context, message).await?;
1194                self.txn_tracker.add_outgoing_messages(outcome);
1195            }
1196            Message::User {
1197                application_id,
1198                bytes,
1199            } => {
1200                self.run_user_action(
1201                    application_id,
1202                    UserAction::Message(context, bytes),
1203                    context.refund_grant_to,
1204                    grant,
1205                )
1206                .await?;
1207            }
1208        }
1209        self.process_subscriptions(context.into()).await?;
1210        Ok(())
1211    }
1212
1213    /// Bounces a message back to its sender, returning any attached grant.
1214    pub fn bounce_message(
1215        &mut self,
1216        context: MessageContext,
1217        grant: Amount,
1218        message: Message,
1219    ) -> Result<(), ExecutionError> {
1220        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1221        self.txn_tracker.add_outgoing_message(OutgoingMessage {
1222            destination: context.origin,
1223            authenticated_owner: context.authenticated_owner,
1224            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
1225            grant,
1226            kind: MessageKind::Bouncing,
1227            message,
1228        });
1229        Ok(())
1230    }
1231
1232    /// Sends a refund of the given amount to the account designated to receive grant refunds.
1233    pub fn send_refund(
1234        &mut self,
1235        context: MessageContext,
1236        amount: Amount,
1237    ) -> Result<(), ExecutionError> {
1238        assert_eq!(context.chain_id, self.state.context().extra().chain_id());
1239        if amount.is_zero() {
1240            return Ok(());
1241        }
1242        let Some(account) = context.refund_grant_to else {
1243            return Err(ExecutionError::InternalError(
1244                "Messages with grants should have a non-empty `refund_grant_to`",
1245            ));
1246        };
1247        let message = SystemMessage::Credit {
1248            amount,
1249            source: context.authenticated_owner.unwrap_or(AccountOwner::CHAIN),
1250            target: account.owner,
1251        };
1252        self.txn_tracker.add_outgoing_message(
1253            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
1254        );
1255        Ok(())
1256    }
1257
1258    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
1259    ///
1260    /// Ensures that the response does not exceed the provided `size_limit`.
1261    async fn receive_http_response(
1262        response: reqwest::Response,
1263        size_limit: u64,
1264    ) -> Result<http::Response, ExecutionError> {
1265        let status = response.status().as_u16();
1266        let maybe_content_length = response.content_length();
1267
1268        let headers = response
1269            .headers()
1270            .iter()
1271            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
1272            .collect::<Vec<_>>();
1273
1274        let total_header_size = headers
1275            .iter()
1276            .map(|header| (header.name.len() + header.value.len()) as u64)
1277            .sum();
1278
1279        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
1280            ExecutionError::HttpResponseSizeLimitExceeded {
1281                limit: size_limit,
1282                size: total_header_size,
1283            },
1284        )?;
1285
1286        if let Some(content_length) = maybe_content_length {
1287            if content_length > remaining_bytes {
1288                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
1289                    limit: size_limit,
1290                    size: content_length + total_header_size,
1291                });
1292            }
1293        }
1294
1295        let mut body = Vec::with_capacity(
1296            usize::try_from(maybe_content_length.unwrap_or(0)).unwrap_or(usize::MAX),
1297        );
1298        let mut body_stream = response.bytes_stream();
1299
1300        while let Some(bytes) = body_stream.next().await.transpose()? {
1301            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
1302                ExecutionError::HttpResponseSizeLimitExceeded {
1303                    limit: size_limit,
1304                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
1305                },
1306            )?;
1307
1308            body.extend(&bytes);
1309        }
1310
1311        Ok(http::Response {
1312            status,
1313            headers,
1314            body,
1315        })
1316    }
1317}
1318
1319/// Requests to the execution state.
1320#[derive(Debug, strum::AsRefStr)]
1321#[allow(missing_docs)]
1322pub enum ExecutionRequest {
1323    #[cfg(not(web))]
1324    LoadContract {
1325        id: ApplicationId,
1326        #[debug(skip)]
1327        callback: Sender<(UserContractCode, ApplicationDescription)>,
1328    },
1329
1330    #[cfg(not(web))]
1331    LoadService {
1332        id: ApplicationId,
1333        #[debug(skip)]
1334        callback: Sender<(UserServiceCode, ApplicationDescription)>,
1335    },
1336
1337    ChainBalance {
1338        #[debug(skip)]
1339        callback: Sender<Amount>,
1340    },
1341
1342    OwnerBalance {
1343        owner: AccountOwner,
1344        #[debug(skip)]
1345        callback: Sender<Amount>,
1346    },
1347
1348    OwnerBalances {
1349        #[debug(skip)]
1350        callback: Sender<Vec<(AccountOwner, Amount)>>,
1351    },
1352
1353    BalanceOwners {
1354        #[debug(skip)]
1355        callback: Sender<Vec<AccountOwner>>,
1356    },
1357
1358    Allowance {
1359        owner: AccountOwner,
1360        spender: AccountOwner,
1361        #[debug(skip)]
1362        callback: Sender<Amount>,
1363    },
1364
1365    Allowances {
1366        #[debug(skip)]
1367        callback: Sender<Vec<(AccountOwner, AccountOwner, Amount)>>,
1368    },
1369
1370    Transfer {
1371        source: AccountOwner,
1372        destination: Account,
1373        amount: Amount,
1374        #[debug(skip_if = Option::is_none)]
1375        signer: Option<AccountOwner>,
1376        application_id: ApplicationId,
1377        #[debug(skip)]
1378        callback: Sender<()>,
1379    },
1380
1381    Claim {
1382        source: Account,
1383        destination: Account,
1384        amount: Amount,
1385        #[debug(skip_if = Option::is_none)]
1386        signer: Option<AccountOwner>,
1387        application_id: ApplicationId,
1388        #[debug(skip)]
1389        callback: Sender<()>,
1390    },
1391
1392    Approve {
1393        owner: AccountOwner,
1394        spender: AccountOwner,
1395        amount: Amount,
1396        #[debug(skip_if = Option::is_none)]
1397        signer: Option<AccountOwner>,
1398        application_id: ApplicationId,
1399        #[debug(skip)]
1400        callback: Sender<()>,
1401    },
1402
1403    TransferFrom {
1404        owner: AccountOwner,
1405        spender: AccountOwner,
1406        destination: Account,
1407        amount: Amount,
1408        #[debug(skip_if = Option::is_none)]
1409        signer: Option<AccountOwner>,
1410        application_id: ApplicationId,
1411        #[debug(skip)]
1412        callback: Sender<()>,
1413    },
1414
1415    SystemTimestamp {
1416        #[debug(skip)]
1417        callback: Sender<Timestamp>,
1418    },
1419
1420    ChainOwnership {
1421        #[debug(skip)]
1422        callback: Sender<ChainOwnership>,
1423    },
1424
1425    ApplicationPermissions {
1426        #[debug(skip)]
1427        callback: Sender<ApplicationPermissions>,
1428    },
1429
1430    ReadApplicationDescription {
1431        application_id: ApplicationId,
1432        #[debug(skip)]
1433        callback: Sender<ApplicationDescription>,
1434    },
1435
1436    ReadValueBytes {
1437        id: ApplicationId,
1438        #[debug(with = hex_debug)]
1439        key: Vec<u8>,
1440        #[debug(skip)]
1441        callback: Sender<Option<Vec<u8>>>,
1442    },
1443
1444    ContainsKey {
1445        id: ApplicationId,
1446        key: Vec<u8>,
1447        #[debug(skip)]
1448        callback: Sender<bool>,
1449    },
1450
1451    ContainsKeys {
1452        id: ApplicationId,
1453        #[debug(with = hex_vec_debug)]
1454        keys: Vec<Vec<u8>>,
1455        callback: Sender<Vec<bool>>,
1456    },
1457
1458    ReadMultiValuesBytes {
1459        id: ApplicationId,
1460        #[debug(with = hex_vec_debug)]
1461        keys: Vec<Vec<u8>>,
1462        #[debug(skip)]
1463        callback: Sender<Vec<Option<Vec<u8>>>>,
1464    },
1465
1466    FindKeysByPrefix {
1467        id: ApplicationId,
1468        #[debug(with = hex_debug)]
1469        key_prefix: Vec<u8>,
1470        #[debug(skip)]
1471        callback: Sender<Vec<Vec<u8>>>,
1472    },
1473
1474    FindKeyValuesByPrefix {
1475        id: ApplicationId,
1476        #[debug(with = hex_debug)]
1477        key_prefix: Vec<u8>,
1478        #[debug(skip)]
1479        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
1480    },
1481
1482    WriteBatch {
1483        id: ApplicationId,
1484        batch: Batch,
1485        #[debug(skip)]
1486        callback: Sender<()>,
1487    },
1488
1489    OpenChain {
1490        ownership: ChainOwnership,
1491        #[debug(skip_if = Amount::is_zero)]
1492        balance: Amount,
1493        parent_id: ChainId,
1494        block_height: BlockHeight,
1495        application_permissions: ApplicationPermissions,
1496        timestamp: Timestamp,
1497        #[debug(skip)]
1498        callback: Sender<ChainId>,
1499    },
1500
1501    CloseChain {
1502        application_id: ApplicationId,
1503        #[debug(skip)]
1504        callback: Sender<Result<(), ExecutionError>>,
1505    },
1506
1507    ChangeOwnership {
1508        application_id: ApplicationId,
1509        ownership: ChainOwnership,
1510        #[debug(skip)]
1511        callback: Sender<Result<(), ExecutionError>>,
1512    },
1513
1514    ChangeApplicationPermissions {
1515        application_id: ApplicationId,
1516        application_permissions: ApplicationPermissions,
1517        #[debug(skip)]
1518        callback: Sender<Result<(), ExecutionError>>,
1519    },
1520
1521    PeekApplicationIndex {
1522        #[debug(skip)]
1523        callback: Sender<u32>,
1524    },
1525
1526    CreateApplication {
1527        chain_id: ChainId,
1528        block_height: BlockHeight,
1529        module_id: ModuleId,
1530        parameters: Vec<u8>,
1531        required_application_ids: Vec<ApplicationId>,
1532        #[debug(skip)]
1533        callback: Sender<CreateApplicationResult>,
1534    },
1535
1536    PerformHttpRequest {
1537        request: http::Request,
1538        http_responses_are_oracle_responses: bool,
1539        #[debug(skip)]
1540        callback: Sender<http::Response>,
1541    },
1542
1543    ReadBlobContent {
1544        blob_id: BlobId,
1545        #[debug(skip)]
1546        callback: Sender<BlobContent>,
1547    },
1548
1549    AssertBlobExists {
1550        blob_id: BlobId,
1551        #[debug(skip)]
1552        callback: Sender<()>,
1553    },
1554
1555    Emit {
1556        stream_id: StreamId,
1557        #[debug(with = hex_debug)]
1558        value: Vec<u8>,
1559        #[debug(skip)]
1560        callback: Sender<u32>,
1561    },
1562
1563    ReadEvent {
1564        event_id: EventId,
1565        callback: oneshot::Sender<Vec<u8>>,
1566    },
1567
1568    SubscribeToEvents {
1569        chain_id: ChainId,
1570        stream_id: StreamId,
1571        subscriber_app_id: ApplicationId,
1572        #[debug(skip)]
1573        callback: Sender<()>,
1574    },
1575
1576    UnsubscribeFromEvents {
1577        chain_id: ChainId,
1578        stream_id: StreamId,
1579        subscriber_app_id: ApplicationId,
1580        #[debug(skip)]
1581        callback: Sender<()>,
1582    },
1583
1584    GetApplicationPermissions {
1585        #[debug(skip)]
1586        callback: Sender<ApplicationPermissions>,
1587    },
1588
1589    QueryServiceOracle {
1590        deadline: Option<Instant>,
1591        application_id: ApplicationId,
1592        next_block_height: BlockHeight,
1593        query: Vec<u8>,
1594        #[debug(skip)]
1595        callback: Sender<Vec<u8>>,
1596    },
1597
1598    AddOutgoingMessage {
1599        message: crate::OutgoingMessage,
1600        #[debug(skip)]
1601        callback: Sender<()>,
1602    },
1603
1604    SetLocalTime {
1605        local_time: Timestamp,
1606        #[debug(skip)]
1607        callback: Sender<()>,
1608    },
1609
1610    AssertBefore {
1611        timestamp: Timestamp,
1612        #[debug(skip)]
1613        callback: Sender<Result<(), ExecutionError>>,
1614    },
1615
1616    AddCreatedBlob {
1617        blob: crate::Blob,
1618        #[debug(skip)]
1619        callback: Sender<()>,
1620    },
1621
1622    ValidationRound {
1623        round: Option<u32>,
1624        #[debug(skip)]
1625        callback: Sender<Option<u32>>,
1626    },
1627
1628    HasEmptyStorage {
1629        application: ApplicationId,
1630        #[debug(skip)]
1631        callback: Sender<bool>,
1632    },
1633
1634    AllowApplicationLogs {
1635        #[debug(skip)]
1636        callback: Sender<bool>,
1637    },
1638
1639    /// Log message from contract execution (fire-and-forget, no callback needed).
1640    #[cfg(web)]
1641    Log {
1642        message: String,
1643        level: tracing::log::Level,
1644    },
1645}