linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27use tracing::instrument;
28
29use crate::{
30    execution::UserAction,
31    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
32    resources::ResourceController,
33    system::CreateApplicationResult,
34    util::{ReceiverExt, UnboundedSenderExt},
35    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
36    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
37    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
38    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
39};
40
41#[cfg(test)]
42#[path = "unit_tests/runtime_tests.rs"]
43mod tests;
44
45pub trait WithContext {
46    type UserContext;
47    type Code;
48}
49
50impl WithContext for UserContractInstance {
51    type UserContext = Timestamp;
52    type Code = UserContractCode;
53}
54
55impl WithContext for UserServiceInstance {
56    type UserContext = ();
57    type Code = UserServiceCode;
58}
59
60#[cfg(test)]
61impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
62    type UserContext = ();
63    type Code = ();
64}
65
66#[derive(Debug)]
67pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
68
69pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
70
71pub struct ServiceSyncRuntime {
72    runtime: SyncRuntime<UserServiceInstance>,
73    current_context: QueryContext,
74}
75
76#[derive(Debug)]
77pub struct SyncRuntimeHandle<UserInstance: WithContext>(
78    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
79);
80
81pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
82pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
83
84/// Runtime data tracked during the execution of a transaction on the synchronous thread.
85#[derive(Debug)]
86pub struct SyncRuntimeInternal<UserInstance: WithContext> {
87    /// The current chain ID.
88    chain_id: ChainId,
89    /// The height of the next block that will be added to this chain. During operations
90    /// and messages, this is the current block height.
91    height: BlockHeight,
92    /// The current consensus round. Only available during block validation in multi-leader rounds.
93    round: Option<u32>,
94    /// The current message being executed, if there is one.
95    #[debug(skip_if = Option::is_none)]
96    executing_message: Option<ExecutingMessage>,
97
98    /// How to interact with the storage view of the execution state.
99    execution_state_sender: ExecutionStateSender,
100
101    /// If applications are being finalized.
102    ///
103    /// If [`true`], disables cross-application calls.
104    is_finalizing: bool,
105    /// Applications that need to be finalized.
106    applications_to_finalize: Vec<ApplicationId>,
107
108    /// Application instances loaded in this transaction.
109    preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
110    /// Application instances loaded in this transaction.
111    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
112    /// The current stack of application descriptions.
113    call_stack: Vec<ApplicationStatus>,
114    /// The set of the IDs of the applications that are in the `call_stack`.
115    active_applications: HashSet<ApplicationId>,
116    /// The operations scheduled during this query.
117    scheduled_operations: Vec<Operation>,
118
119    /// Track application states based on views.
120    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
121
122    /// The deadline this runtime should finish executing.
123    ///
124    /// Used to limit the execution time of services running as oracles.
125    deadline: Option<Instant>,
126
127    /// Where to send a refund for the unused part of the grant after execution, if any.
128    #[debug(skip_if = Option::is_none)]
129    refund_grant_to: Option<Account>,
130    /// Controller to track fuel and storage consumption.
131    resource_controller: ResourceController,
132    /// Additional context for the runtime.
133    user_context: UserInstance::UserContext,
134    /// Whether contract log messages should be output.
135    allow_application_logs: bool,
136}
137
138/// The runtime status of an application.
139#[derive(Debug)]
140struct ApplicationStatus {
141    /// The caller application ID, if forwarded during the call.
142    caller_id: Option<ApplicationId>,
143    /// The application ID.
144    id: ApplicationId,
145    /// The application description.
146    description: ApplicationDescription,
147    /// The authenticated owner for the execution thread, if any.
148    signer: Option<AccountOwner>,
149}
150
151/// A loaded application instance.
152#[derive(Debug)]
153struct LoadedApplication<Instance> {
154    instance: Arc<Mutex<Instance>>,
155    description: ApplicationDescription,
156}
157
158impl<Instance> LoadedApplication<Instance> {
159    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
160    fn new(instance: Instance, description: ApplicationDescription) -> Self {
161        LoadedApplication {
162            instance: Arc::new(Mutex::new(instance)),
163            description,
164        }
165    }
166}
167
168impl<Instance> Clone for LoadedApplication<Instance> {
169    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
170    // bound
171    fn clone(&self) -> Self {
172        LoadedApplication {
173            instance: self.instance.clone(),
174            description: self.description.clone(),
175        }
176    }
177}
178
179#[derive(Debug)]
180enum Promise<T> {
181    Ready(T),
182    Pending(Receiver<T>),
183}
184
185impl<T> Promise<T> {
186    fn force(&mut self) -> Result<(), ExecutionError> {
187        if let Promise::Pending(receiver) = self {
188            let value = receiver
189                .recv_ref()
190                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
191            *self = Promise::Ready(value);
192        }
193        Ok(())
194    }
195
196    fn read(self) -> Result<T, ExecutionError> {
197        match self {
198            Promise::Pending(receiver) => {
199                let value = receiver.recv_response()?;
200                Ok(value)
201            }
202            Promise::Ready(value) => Ok(value),
203        }
204    }
205}
206
207/// Manages a set of pending queries returning values of type `T`.
208#[derive(Debug, Default)]
209struct QueryManager<T> {
210    /// The queries in progress.
211    pending_queries: BTreeMap<u32, Promise<T>>,
212    /// The number of queries ever registered so far. Used for the index of the next query.
213    query_count: u32,
214    /// The number of active queries.
215    active_query_count: u32,
216}
217
218impl<T> QueryManager<T> {
219    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
220        let id = self.query_count;
221        self.pending_queries.insert(id, Promise::Pending(receiver));
222        self.query_count = self
223            .query_count
224            .checked_add(1)
225            .ok_or(ArithmeticError::Overflow)?;
226        self.active_query_count = self
227            .active_query_count
228            .checked_add(1)
229            .ok_or(ArithmeticError::Overflow)?;
230        Ok(id)
231    }
232
233    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
234        let promise = self
235            .pending_queries
236            .remove(&id)
237            .ok_or(ExecutionError::InvalidPromise)?;
238        let value = promise.read()?;
239        self.active_query_count -= 1;
240        Ok(value)
241    }
242
243    fn force_all(&mut self) -> Result<(), ExecutionError> {
244        for promise in self.pending_queries.values_mut() {
245            promise.force()?;
246        }
247        Ok(())
248    }
249}
250
251type Keys = Vec<Vec<u8>>;
252type Value = Vec<u8>;
253type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
254
255#[derive(Debug, Default)]
256struct ViewUserState {
257    /// The contains-key queries in progress.
258    contains_key_queries: QueryManager<bool>,
259    /// The contains-keys queries in progress.
260    contains_keys_queries: QueryManager<Vec<bool>>,
261    /// The read-value queries in progress.
262    read_value_queries: QueryManager<Option<Value>>,
263    /// The read-multi-values queries in progress.
264    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
265    /// The find-keys queries in progress.
266    find_keys_queries: QueryManager<Keys>,
267    /// The find-key-values queries in progress.
268    find_key_values_queries: QueryManager<KeyValues>,
269}
270
271impl ViewUserState {
272    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
273        self.contains_key_queries.force_all()?;
274        self.contains_keys_queries.force_all()?;
275        self.read_value_queries.force_all()?;
276        self.read_multi_values_queries.force_all()?;
277        self.find_keys_queries.force_all()?;
278        self.find_key_values_queries.force_all()?;
279        Ok(())
280    }
281}
282
283impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
284    type Target = SyncRuntimeHandle<UserInstance>;
285
286    fn deref(&self) -> &Self::Target {
287        self.0.as_ref().expect(
288            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
289        )
290    }
291}
292
293impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
294    fn deref_mut(&mut self) -> &mut Self::Target {
295        self.0.as_mut().expect(
296            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
297        )
298    }
299}
300
301impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
302    fn drop(&mut self) {
303        // Ensure the `loaded_applications` are cleared to prevent circular references in
304        // the runtime
305        if let Some(handle) = self.0.take() {
306            handle.inner().loaded_applications.clear();
307        }
308    }
309}
310
311impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
312    #[expect(clippy::too_many_arguments)]
313    fn new(
314        chain_id: ChainId,
315        height: BlockHeight,
316        round: Option<u32>,
317        executing_message: Option<ExecutingMessage>,
318        execution_state_sender: ExecutionStateSender,
319        deadline: Option<Instant>,
320        refund_grant_to: Option<Account>,
321        resource_controller: ResourceController,
322        user_context: UserInstance::UserContext,
323        allow_application_logs: bool,
324    ) -> Self {
325        Self {
326            chain_id,
327            height,
328            round,
329            executing_message,
330            execution_state_sender,
331            is_finalizing: false,
332            applications_to_finalize: Vec::new(),
333            preloaded_applications: HashMap::new(),
334            loaded_applications: HashMap::new(),
335            call_stack: Vec::new(),
336            active_applications: HashSet::new(),
337            view_user_states: BTreeMap::new(),
338            deadline,
339            refund_grant_to,
340            resource_controller,
341            scheduled_operations: Vec::new(),
342            user_context,
343            allow_application_logs,
344        }
345    }
346
347    /// Returns the [`ApplicationStatus`] of the current application.
348    ///
349    /// The current application is the last to be pushed to the `call_stack`.
350    ///
351    /// # Panics
352    ///
353    /// If the call stack is empty.
354    fn current_application(&self) -> &ApplicationStatus {
355        self.call_stack
356            .last()
357            .expect("Call stack is unexpectedly empty")
358    }
359
360    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
361    ///
362    /// Ensures the application's ID is also tracked in the `active_applications` set.
363    fn push_application(&mut self, status: ApplicationStatus) {
364        self.active_applications.insert(status.id);
365        self.call_stack.push(status);
366    }
367
368    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
369    ///
370    /// Ensures the application's ID is also removed from the `active_applications` set.
371    ///
372    /// # Panics
373    ///
374    /// If the call stack is empty.
375    fn pop_application(&mut self) -> ApplicationStatus {
376        let status = self
377            .call_stack
378            .pop()
379            .expect("Can't remove application from empty call stack");
380        assert!(self.active_applications.remove(&status.id));
381        status
382    }
383
384    /// Ensures that a call to `application_id` is not-reentrant.
385    ///
386    /// Returns an error if there already is an entry for `application_id` in the call stack.
387    fn check_for_reentrancy(&self, application_id: ApplicationId) -> Result<(), ExecutionError> {
388        ensure!(
389            !self.active_applications.contains(&application_id),
390            ExecutionError::ReentrantCall(application_id)
391        );
392        Ok(())
393    }
394}
395
396impl SyncRuntimeInternal<UserContractInstance> {
397    /// Loads a contract instance, initializing it with this runtime if needed.
398    #[instrument(skip_all, fields(application_id = %id))]
399    fn load_contract_instance(
400        &mut self,
401        this: SyncRuntimeHandle<UserContractInstance>,
402        id: ApplicationId,
403    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
404        match self.loaded_applications.entry(id) {
405            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
406
407            hash_map::Entry::Vacant(entry) => {
408                // First time actually using the application. Let's see if the code was
409                // pre-loaded.
410                let (code, description) = match self.preloaded_applications.entry(id) {
411                    // TODO(#2927): support dynamic loading of modules on the Web
412                    #[cfg(web)]
413                    hash_map::Entry::Vacant(_) => {
414                        drop(this);
415                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
416                            id,
417                        )));
418                    }
419                    #[cfg(not(web))]
420                    hash_map::Entry::Vacant(entry) => {
421                        let (code, description) = self
422                            .execution_state_sender
423                            .send_request(move |callback| ExecutionRequest::LoadContract {
424                                id,
425                                callback,
426                            })?
427                            .recv_response()?;
428                        entry.insert((code, description)).clone()
429                    }
430                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
431                };
432                let instance = code.instantiate(this)?;
433
434                self.applications_to_finalize.push(id);
435                Ok(entry
436                    .insert(LoadedApplication::new(instance, description))
437                    .clone())
438            }
439        }
440    }
441
442    /// Configures the runtime for executing a call to a different contract.
443    fn prepare_for_call(
444        &mut self,
445        this: ContractSyncRuntimeHandle,
446        authenticated: bool,
447        callee_id: ApplicationId,
448    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
449        self.check_for_reentrancy(callee_id)?;
450
451        ensure!(
452            !self.is_finalizing,
453            ExecutionError::CrossApplicationCallInFinalize {
454                caller_id: Box::new(self.current_application().id),
455                callee_id: Box::new(callee_id),
456            }
457        );
458
459        // Load the application.
460        let application = self.load_contract_instance(this, callee_id)?;
461
462        let caller = self.current_application();
463        let caller_id = caller.id;
464        let caller_signer = caller.signer;
465        // Make the call to user code.
466        let authenticated_owner = match caller_signer {
467            Some(signer) if authenticated => Some(signer),
468            _ => None,
469        };
470        let authenticated_caller_id = authenticated.then_some(caller_id);
471        self.push_application(ApplicationStatus {
472            caller_id: authenticated_caller_id,
473            id: callee_id,
474            description: application.description,
475            // Allow further nested calls to be authenticated if this one is.
476            signer: authenticated_owner,
477        });
478        Ok(application.instance)
479    }
480
481    /// Cleans up the runtime after the execution of a call to a different contract.
482    fn finish_call(&mut self) {
483        self.pop_application();
484    }
485
486    /// Runs the service in a separate thread as an oracle.
487    fn run_service_oracle_query(
488        &mut self,
489        application_id: ApplicationId,
490        query: Vec<u8>,
491    ) -> Result<Vec<u8>, ExecutionError> {
492        let timeout = self
493            .resource_controller
494            .remaining_service_oracle_execution_time()?;
495        let execution_start = Instant::now();
496        let deadline = Some(execution_start + timeout);
497        let response = self
498            .execution_state_sender
499            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
500                deadline,
501                application_id,
502                next_block_height: self.height,
503                query,
504                callback,
505            })?
506            .recv_response()?;
507
508        self.resource_controller
509            .track_service_oracle_execution(execution_start.elapsed())?;
510        self.resource_controller
511            .track_service_oracle_response(response.len())?;
512
513        Ok(response)
514    }
515}
516
517impl SyncRuntimeInternal<UserServiceInstance> {
518    /// Initializes a service instance with this runtime.
519    fn load_service_instance(
520        &mut self,
521        this: SyncRuntimeHandle<UserServiceInstance>,
522        id: ApplicationId,
523    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
524        match self.loaded_applications.entry(id) {
525            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
526
527            hash_map::Entry::Vacant(entry) => {
528                // First time actually using the application. Let's see if the code was
529                // pre-loaded.
530                let (code, description) = match self.preloaded_applications.entry(id) {
531                    // TODO(#2927): support dynamic loading of modules on the Web
532                    #[cfg(web)]
533                    hash_map::Entry::Vacant(_) => {
534                        drop(this);
535                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
536                            id,
537                        )));
538                    }
539                    #[cfg(not(web))]
540                    hash_map::Entry::Vacant(entry) => {
541                        let (code, description) = self
542                            .execution_state_sender
543                            .send_request(move |callback| ExecutionRequest::LoadService {
544                                id,
545                                callback,
546                            })?
547                            .recv_response()?;
548                        entry.insert((code, description)).clone()
549                    }
550                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
551                };
552                let instance = code.instantiate(this)?;
553
554                self.applications_to_finalize.push(id);
555                Ok(entry
556                    .insert(LoadedApplication::new(instance, description))
557                    .clone())
558            }
559        }
560    }
561}
562
563impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
564    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
565        let handle = self.0.take().expect(
566            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
567        );
568        let runtime = Arc::into_inner(handle.0)?
569            .into_inner()
570            .expect("`SyncRuntime` should run in a single thread which should not panic");
571        Some(runtime)
572    }
573}
574
575impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
576    for SyncRuntimeHandle<UserInstance>
577{
578    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
579        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
580    }
581}
582
583impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
584    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
585        self.0
586            .try_lock()
587            .expect("Synchronous runtimes run on a single execution thread")
588    }
589}
590
591impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
592where
593    Self: ContractOrServiceRuntime,
594{
595    type Read = ();
596    type ReadValueBytes = u32;
597    type ContainsKey = u32;
598    type ContainsKeys = u32;
599    type ReadMultiValuesBytes = u32;
600    type FindKeysByPrefix = u32;
601    type FindKeyValuesByPrefix = u32;
602
603    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
604        let mut this = self.inner();
605        let chain_id = this.chain_id;
606        this.resource_controller.track_runtime_chain_id()?;
607        Ok(chain_id)
608    }
609
610    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
611        let mut this = self.inner();
612        let height = this.height;
613        this.resource_controller.track_runtime_block_height()?;
614        Ok(height)
615    }
616
617    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
618        let mut this = self.inner();
619        let application_id = this.current_application().id;
620        this.resource_controller.track_runtime_application_id()?;
621        Ok(application_id)
622    }
623
624    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
625        let mut this = self.inner();
626        let application_creator_chain_id = this.current_application().description.creator_chain_id;
627        this.resource_controller.track_runtime_application_id()?;
628        Ok(application_creator_chain_id)
629    }
630
631    fn read_application_description(
632        &mut self,
633        application_id: ApplicationId,
634    ) -> Result<ApplicationDescription, ExecutionError> {
635        let mut this = self.inner();
636        let description = this
637            .execution_state_sender
638            .send_request(|callback| ExecutionRequest::ReadApplicationDescription {
639                application_id,
640                callback,
641            })?
642            .recv_response()?;
643        this.resource_controller
644            .track_runtime_application_description(&description)?;
645        Ok(description)
646    }
647
648    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
649        let mut this = self.inner();
650        let parameters = this.current_application().description.parameters.clone();
651        this.resource_controller
652            .track_runtime_application_parameters(&parameters)?;
653        Ok(parameters)
654    }
655
656    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
657        let mut this = self.inner();
658        let timestamp = this
659            .execution_state_sender
660            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
661            .recv_response()?;
662        this.resource_controller.track_runtime_timestamp()?;
663        Ok(timestamp)
664    }
665
666    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
667        let mut this = self.inner();
668        let balance = this
669            .execution_state_sender
670            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
671            .recv_response()?;
672        this.resource_controller.track_runtime_balance()?;
673        Ok(balance)
674    }
675
676    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
677        let mut this = self.inner();
678        let balance = this
679            .execution_state_sender
680            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
681            .recv_response()?;
682        this.resource_controller.track_runtime_balance()?;
683        Ok(balance)
684    }
685
686    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
687        let mut this = self.inner();
688        let owner_balances = this
689            .execution_state_sender
690            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
691            .recv_response()?;
692        this.resource_controller
693            .track_runtime_owner_balances(&owner_balances)?;
694        Ok(owner_balances)
695    }
696
697    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
698        let mut this = self.inner();
699        let owners = this
700            .execution_state_sender
701            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
702            .recv_response()?;
703        this.resource_controller.track_runtime_owners(&owners)?;
704        Ok(owners)
705    }
706
707    fn read_allowance(
708        &mut self,
709        owner: AccountOwner,
710        spender: AccountOwner,
711    ) -> Result<Amount, ExecutionError> {
712        let this = self.inner();
713        let allowance = this
714            .execution_state_sender
715            .send_request(|callback| ExecutionRequest::Allowance {
716                owner,
717                spender,
718                callback,
719            })?
720            .recv_response()?;
721        Ok(allowance)
722    }
723
724    fn read_allowances(
725        &mut self,
726    ) -> Result<Vec<(AccountOwner, AccountOwner, Amount)>, ExecutionError> {
727        let this = self.inner();
728        let allowances = this
729            .execution_state_sender
730            .send_request(|callback| ExecutionRequest::Allowances { callback })?
731            .recv_response()?;
732        Ok(allowances)
733    }
734
735    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
736        let mut this = self.inner();
737        let chain_ownership = this
738            .execution_state_sender
739            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
740            .recv_response()?;
741        this.resource_controller
742            .track_runtime_chain_ownership(&chain_ownership)?;
743        Ok(chain_ownership)
744    }
745
746    fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
747        let this = self.inner();
748        let application_permissions = this
749            .execution_state_sender
750            .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
751            .recv_response()?;
752        Ok(application_permissions)
753    }
754
755    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
756        let mut this = self.inner();
757        let id = this.current_application().id;
758        this.resource_controller.track_read_operation()?;
759        let receiver = this
760            .execution_state_sender
761            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
762        let state = this.view_user_states.entry(id).or_default();
763        state.contains_key_queries.register(receiver)
764    }
765
766    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
767        let mut this = self.inner();
768        let id = this.current_application().id;
769        let state = this.view_user_states.entry(id).or_default();
770        let value = state.contains_key_queries.wait(*promise)?;
771        Ok(value)
772    }
773
774    fn contains_keys_new(
775        &mut self,
776        keys: Vec<Vec<u8>>,
777    ) -> Result<Self::ContainsKeys, ExecutionError> {
778        let mut this = self.inner();
779        let id = this.current_application().id;
780        this.resource_controller.track_read_operation()?;
781        let receiver = this
782            .execution_state_sender
783            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
784        let state = this.view_user_states.entry(id).or_default();
785        state.contains_keys_queries.register(receiver)
786    }
787
788    fn contains_keys_wait(
789        &mut self,
790        promise: &Self::ContainsKeys,
791    ) -> Result<Vec<bool>, ExecutionError> {
792        let mut this = self.inner();
793        let id = this.current_application().id;
794        let state = this.view_user_states.entry(id).or_default();
795        let value = state.contains_keys_queries.wait(*promise)?;
796        Ok(value)
797    }
798
799    fn read_multi_values_bytes_new(
800        &mut self,
801        keys: Vec<Vec<u8>>,
802    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
803        let mut this = self.inner();
804        let id = this.current_application().id;
805        this.resource_controller.track_read_operation()?;
806        let receiver = this.execution_state_sender.send_request(move |callback| {
807            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
808        })?;
809        let state = this.view_user_states.entry(id).or_default();
810        state.read_multi_values_queries.register(receiver)
811    }
812
813    fn read_multi_values_bytes_wait(
814        &mut self,
815        promise: &Self::ReadMultiValuesBytes,
816    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
817        let mut this = self.inner();
818        let id = this.current_application().id;
819        let state = this.view_user_states.entry(id).or_default();
820        let values = state.read_multi_values_queries.wait(*promise)?;
821        for value in &values {
822            if let Some(value) = &value {
823                this.resource_controller
824                    .track_bytes_read(value.len() as u64)?;
825            }
826        }
827        Ok(values)
828    }
829
830    fn read_value_bytes_new(
831        &mut self,
832        key: Vec<u8>,
833    ) -> Result<Self::ReadValueBytes, ExecutionError> {
834        let mut this = self.inner();
835        let id = this.current_application().id;
836        this.resource_controller.track_read_operation()?;
837        let receiver = this
838            .execution_state_sender
839            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
840        let state = this.view_user_states.entry(id).or_default();
841        state.read_value_queries.register(receiver)
842    }
843
844    fn read_value_bytes_wait(
845        &mut self,
846        promise: &Self::ReadValueBytes,
847    ) -> Result<Option<Vec<u8>>, ExecutionError> {
848        let mut this = self.inner();
849        let id = this.current_application().id;
850        let value = {
851            let state = this.view_user_states.entry(id).or_default();
852            state.read_value_queries.wait(*promise)?
853        };
854        if let Some(value) = &value {
855            this.resource_controller
856                .track_bytes_read(value.len() as u64)?;
857        }
858        Ok(value)
859    }
860
861    fn find_keys_by_prefix_new(
862        &mut self,
863        key_prefix: Vec<u8>,
864    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
865        let mut this = self.inner();
866        let id = this.current_application().id;
867        this.resource_controller.track_read_operation()?;
868        let receiver = this.execution_state_sender.send_request(move |callback| {
869            ExecutionRequest::FindKeysByPrefix {
870                id,
871                key_prefix,
872                callback,
873            }
874        })?;
875        let state = this.view_user_states.entry(id).or_default();
876        state.find_keys_queries.register(receiver)
877    }
878
879    fn find_keys_by_prefix_wait(
880        &mut self,
881        promise: &Self::FindKeysByPrefix,
882    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
883        let mut this = self.inner();
884        let id = this.current_application().id;
885        let keys = {
886            let state = this.view_user_states.entry(id).or_default();
887            state.find_keys_queries.wait(*promise)?
888        };
889        let mut read_size = 0;
890        for key in &keys {
891            read_size += key.len();
892        }
893        this.resource_controller
894            .track_bytes_read(read_size as u64)?;
895        Ok(keys)
896    }
897
898    fn find_key_values_by_prefix_new(
899        &mut self,
900        key_prefix: Vec<u8>,
901    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
902        let mut this = self.inner();
903        let id = this.current_application().id;
904        this.resource_controller.track_read_operation()?;
905        let receiver = this.execution_state_sender.send_request(move |callback| {
906            ExecutionRequest::FindKeyValuesByPrefix {
907                id,
908                key_prefix,
909                callback,
910            }
911        })?;
912        let state = this.view_user_states.entry(id).or_default();
913        state.find_key_values_queries.register(receiver)
914    }
915
916    fn find_key_values_by_prefix_wait(
917        &mut self,
918        promise: &Self::FindKeyValuesByPrefix,
919    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
920        let mut this = self.inner();
921        let id = this.current_application().id;
922        let state = this.view_user_states.entry(id).or_default();
923        let key_values = state.find_key_values_queries.wait(*promise)?;
924        let mut read_size = 0;
925        for (key, value) in &key_values {
926            read_size += key.len() + value.len();
927        }
928        this.resource_controller
929            .track_bytes_read(read_size as u64)?;
930        Ok(key_values)
931    }
932
933    fn perform_http_request(
934        &mut self,
935        request: http::Request,
936    ) -> Result<http::Response, ExecutionError> {
937        let mut this = self.inner();
938        let app_permissions = this
939            .execution_state_sender
940            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
941            .recv_response()?;
942
943        let app_id = this.current_application().id;
944        ensure!(
945            app_permissions.can_make_http_requests(&app_id),
946            ExecutionError::UnauthorizedApplication(app_id)
947        );
948
949        this.resource_controller.track_http_request()?;
950
951        this.execution_state_sender
952            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
953                request,
954                http_responses_are_oracle_responses:
955                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
956                callback,
957            })?
958            .recv_response()
959    }
960
961    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
962        let this = self.inner();
963        this.execution_state_sender
964            .send_request(|callback| ExecutionRequest::AssertBefore {
965                timestamp,
966                callback,
967            })?
968            .recv_response()?
969    }
970
971    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
972        let this = self.inner();
973        let blob_id = hash.into();
974        let content = this
975            .execution_state_sender
976            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
977            .recv_response()?;
978        Ok(content.into_vec_or_clone())
979    }
980
981    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
982        let this = self.inner();
983        let blob_id = hash.into();
984        this.execution_state_sender
985            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
986            .recv_response()
987    }
988
989    fn has_empty_storage(&mut self, application: ApplicationId) -> Result<bool, ExecutionError> {
990        let this = self.inner();
991        let (key_size, value_size) = this
992            .execution_state_sender
993            .send_request(move |callback| ExecutionRequest::TotalStorageSize {
994                application,
995                callback,
996            })?
997            .recv_response()?;
998        Ok(key_size + value_size == 0)
999    }
1000
1001    fn maximum_blob_size(&mut self) -> Result<u64, ExecutionError> {
1002        Ok(self.inner().resource_controller.policy().maximum_blob_size)
1003    }
1004
1005    fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
1006        Ok(self.inner().allow_application_logs)
1007    }
1008
1009    #[cfg(web)]
1010    fn send_log(&mut self, message: String, level: tracing::log::Level) {
1011        let this = self.inner();
1012        // Fire-and-forget: ignore errors since logging shouldn't affect execution.
1013        this.execution_state_sender
1014            .unbounded_send(ExecutionRequest::Log { message, level })
1015            .ok();
1016    }
1017}
1018
1019/// An extension trait to determine in compile time the different behaviors between contract and
1020/// services in the implementation of [`BaseRuntime`].
1021trait ContractOrServiceRuntime {
1022    /// Configured to `true` if the HTTP response size should be limited to the oracle response
1023    /// size.
1024    ///
1025    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
1026    /// and only storing in the block a shorter oracle response.
1027    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
1028}
1029
1030impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
1031    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
1032}
1033
1034impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
1035    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
1036}
1037
1038impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
1039    fn clone(&self) -> Self {
1040        SyncRuntimeHandle(self.0.clone())
1041    }
1042}
1043
1044impl ContractSyncRuntime {
1045    pub(crate) fn new(
1046        execution_state_sender: ExecutionStateSender,
1047        chain_id: ChainId,
1048        refund_grant_to: Option<Account>,
1049        resource_controller: ResourceController,
1050        action: &UserAction,
1051        allow_application_logs: bool,
1052    ) -> Self {
1053        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1054            SyncRuntimeInternal::new(
1055                chain_id,
1056                action.height(),
1057                action.round(),
1058                if let UserAction::Message(context, _) = action {
1059                    Some(context.into())
1060                } else {
1061                    None
1062                },
1063                execution_state_sender,
1064                None,
1065                refund_grant_to,
1066                resource_controller,
1067                action.timestamp(),
1068                allow_application_logs,
1069            ),
1070        )))
1071    }
1072
1073    /// Preloads the code of a contract into the runtime's memory.
1074    pub(crate) fn preload_contract(
1075        &self,
1076        id: ApplicationId,
1077        code: UserContractCode,
1078        description: ApplicationDescription,
1079    ) {
1080        let this = self
1081            .0
1082            .as_ref()
1083            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1084        let mut this_guard = this.inner();
1085
1086        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1087            entry.insert((code, description));
1088        }
1089    }
1090
1091    /// Main entry point to start executing a user action.
1092    pub(crate) fn run_action(
1093        mut self,
1094        application_id: ApplicationId,
1095        chain_id: ChainId,
1096        action: UserAction,
1097    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1098        let result = self
1099            .deref_mut()
1100            .run_action(application_id, chain_id, action)?;
1101        let runtime = self
1102            .into_inner()
1103            .expect("Runtime clones should have been freed by now");
1104
1105        Ok((result, runtime.resource_controller))
1106    }
1107}
1108
1109impl ContractSyncRuntimeHandle {
1110    #[instrument(skip_all, fields(application_id = %application_id))]
1111    fn run_action(
1112        &self,
1113        application_id: ApplicationId,
1114        chain_id: ChainId,
1115        action: UserAction,
1116    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1117        let finalize_context = FinalizeContext {
1118            authenticated_owner: action.signer(),
1119            chain_id,
1120            height: action.height(),
1121            round: action.round(),
1122        };
1123
1124        {
1125            let runtime = self.inner();
1126            assert_eq!(runtime.chain_id, chain_id);
1127            assert_eq!(runtime.height, action.height());
1128        }
1129
1130        let signer = action.signer();
1131        let closure = move |code: &mut UserContractInstance| match action {
1132            UserAction::Instantiate(_context, argument) => {
1133                code.instantiate(argument).map(|()| None)
1134            }
1135            UserAction::Operation(_context, operation) => {
1136                code.execute_operation(operation).map(Option::Some)
1137            }
1138            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1139            UserAction::ProcessStreams(_context, updates) => {
1140                code.process_streams(updates).map(|()| None)
1141            }
1142        };
1143
1144        let result = self.execute(application_id, signer, closure)?;
1145        self.finalize(finalize_context)?;
1146        Ok(result)
1147    }
1148
1149    /// Notifies all loaded applications that execution is finalizing.
1150    #[instrument(skip_all)]
1151    fn finalize(&self, context: FinalizeContext) -> Result<(), ExecutionError> {
1152        let applications = mem::take(&mut self.inner().applications_to_finalize)
1153            .into_iter()
1154            .rev();
1155
1156        self.inner().is_finalizing = true;
1157
1158        for application in applications {
1159            self.execute(application, context.authenticated_owner, |contract| {
1160                contract.finalize().map(|_| None)
1161            })?;
1162            self.inner().loaded_applications.remove(&application);
1163        }
1164
1165        Ok(())
1166    }
1167
1168    /// Executes a `closure` with the contract code for the `application_id`.
1169    #[instrument(skip_all, fields(application_id = %application_id))]
1170    fn execute(
1171        &self,
1172        application_id: ApplicationId,
1173        signer: Option<AccountOwner>,
1174        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1175    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1176        let contract = {
1177            let mut runtime = self.inner();
1178            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1179
1180            let status = ApplicationStatus {
1181                caller_id: None,
1182                id: application_id,
1183                description: application.description.clone(),
1184                signer,
1185            };
1186
1187            runtime.push_application(status);
1188
1189            application
1190        };
1191
1192        let result = closure(
1193            &mut contract
1194                .instance
1195                .try_lock()
1196                .expect("Application should not be already executing"),
1197        )?;
1198
1199        let mut runtime = self.inner();
1200        let application_status = runtime.pop_application();
1201        assert_eq!(application_status.caller_id, None);
1202        assert_eq!(application_status.id, application_id);
1203        assert_eq!(application_status.description, contract.description);
1204        assert_eq!(application_status.signer, signer);
1205        assert!(runtime.call_stack.is_empty());
1206
1207        Ok(result)
1208    }
1209}
1210
1211impl ContractRuntime for ContractSyncRuntimeHandle {
1212    fn authenticated_owner(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1213        let this = self.inner();
1214        Ok(this.current_application().signer)
1215    }
1216
1217    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1218        Ok(self
1219            .inner()
1220            .executing_message
1221            .map(|metadata| metadata.is_bouncing))
1222    }
1223
1224    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1225        Ok(self
1226            .inner()
1227            .executing_message
1228            .map(|metadata| metadata.origin))
1229    }
1230
1231    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1232        let this = self.inner();
1233        if this.call_stack.len() <= 1 {
1234            return Ok(None);
1235        }
1236        Ok(this.current_application().caller_id)
1237    }
1238
1239    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1240        Ok(match vm_runtime {
1241            VmRuntime::Wasm => {
1242                self.inner()
1243                    .resource_controller
1244                    .policy()
1245                    .maximum_wasm_fuel_per_block
1246            }
1247            VmRuntime::Evm => {
1248                self.inner()
1249                    .resource_controller
1250                    .policy()
1251                    .maximum_evm_fuel_per_block
1252            }
1253        })
1254    }
1255
1256    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1257        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1258    }
1259
1260    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1261        let mut this = self.inner();
1262        this.resource_controller.track_fuel(fuel, vm_runtime)
1263    }
1264
1265    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1266        let mut this = self.inner();
1267        let application = this.current_application();
1268        let application_id = application.id;
1269        let authenticated_owner = application.signer;
1270        let mut refund_grant_to = this.refund_grant_to;
1271
1272        let grant = this
1273            .resource_controller
1274            .policy()
1275            .total_price(&message.grant)?;
1276        if grant.is_zero() {
1277            refund_grant_to = None;
1278        } else {
1279            this.resource_controller.track_grant(grant)?;
1280        }
1281        let kind = if message.is_tracked {
1282            MessageKind::Tracked
1283        } else {
1284            MessageKind::Simple
1285        };
1286
1287        this.execution_state_sender
1288            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1289                message: OutgoingMessage {
1290                    destination: message.destination,
1291                    authenticated_owner,
1292                    refund_grant_to,
1293                    grant,
1294                    kind,
1295                    message: Message::User {
1296                        application_id,
1297                        bytes: message.message,
1298                    },
1299                },
1300                callback,
1301            })?
1302            .recv_response()?;
1303
1304        Ok(())
1305    }
1306
1307    fn transfer(
1308        &mut self,
1309        source: AccountOwner,
1310        destination: Account,
1311        amount: Amount,
1312    ) -> Result<(), ExecutionError> {
1313        let this = self.inner();
1314        let current_application = this.current_application();
1315        let application_id = current_application.id;
1316        let signer = current_application.signer;
1317
1318        this.execution_state_sender
1319            .send_request(|callback| ExecutionRequest::Transfer {
1320                source,
1321                destination,
1322                amount,
1323                signer,
1324                application_id,
1325                callback,
1326            })?
1327            .recv_response()?;
1328        Ok(())
1329    }
1330
1331    fn claim(
1332        &mut self,
1333        source: Account,
1334        destination: Account,
1335        amount: Amount,
1336    ) -> Result<(), ExecutionError> {
1337        let this = self.inner();
1338        let current_application = this.current_application();
1339        let application_id = current_application.id;
1340        let signer = current_application.signer;
1341
1342        this.execution_state_sender
1343            .send_request(|callback| ExecutionRequest::Claim {
1344                source,
1345                destination,
1346                amount,
1347                signer,
1348                application_id,
1349                callback,
1350            })?
1351            .recv_response()?;
1352        Ok(())
1353    }
1354
1355    fn approve(
1356        &mut self,
1357        owner: AccountOwner,
1358        spender: AccountOwner,
1359        amount: Amount,
1360    ) -> Result<(), ExecutionError> {
1361        let this = self.inner();
1362        let current_application = this.current_application();
1363        let application_id = current_application.id;
1364        let signer = current_application.signer;
1365
1366        this.execution_state_sender
1367            .send_request(|callback| ExecutionRequest::Approve {
1368                owner,
1369                spender,
1370                amount,
1371                signer,
1372                application_id,
1373                callback,
1374            })?
1375            .recv_response()?;
1376        Ok(())
1377    }
1378
1379    fn transfer_from(
1380        &mut self,
1381        owner: AccountOwner,
1382        spender: AccountOwner,
1383        destination: Account,
1384        amount: Amount,
1385    ) -> Result<(), ExecutionError> {
1386        let this = self.inner();
1387        let current_application = this.current_application();
1388        let application_id = current_application.id;
1389        let signer = current_application.signer;
1390
1391        this.execution_state_sender
1392            .send_request(|callback| ExecutionRequest::TransferFrom {
1393                owner,
1394                spender,
1395                destination,
1396                amount,
1397                signer,
1398                application_id,
1399                callback,
1400            })?
1401            .recv_response()?;
1402        Ok(())
1403    }
1404
1405    fn try_call_application(
1406        &mut self,
1407        authenticated: bool,
1408        callee_id: ApplicationId,
1409        argument: Vec<u8>,
1410    ) -> Result<Vec<u8>, ExecutionError> {
1411        let contract = self
1412            .inner()
1413            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1414
1415        let value = contract
1416            .try_lock()
1417            .expect("Applications should not have reentrant calls")
1418            .execute_operation(argument)?;
1419
1420        self.inner().finish_call();
1421
1422        Ok(value)
1423    }
1424
1425    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1426        let mut this = self.inner();
1427        ensure!(
1428            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1429            ExecutionError::StreamNameTooLong
1430        );
1431        let application_id = GenericApplicationId::User(this.current_application().id);
1432        let stream_id = StreamId {
1433            stream_name,
1434            application_id,
1435        };
1436        let value_len = value.len() as u64;
1437        let index = this
1438            .execution_state_sender
1439            .send_request(|callback| ExecutionRequest::Emit {
1440                stream_id,
1441                value,
1442                callback,
1443            })?
1444            .recv_response()?;
1445        // TODO(#365): Consider separate event fee categories.
1446        this.resource_controller.track_bytes_written(value_len)?;
1447        Ok(index)
1448    }
1449
1450    fn read_event(
1451        &mut self,
1452        chain_id: ChainId,
1453        stream_name: StreamName,
1454        index: u32,
1455    ) -> Result<Vec<u8>, ExecutionError> {
1456        let mut this = self.inner();
1457        ensure!(
1458            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1459            ExecutionError::StreamNameTooLong
1460        );
1461        let application_id = GenericApplicationId::User(this.current_application().id);
1462        let stream_id = StreamId {
1463            stream_name,
1464            application_id,
1465        };
1466        let event_id = EventId {
1467            stream_id,
1468            index,
1469            chain_id,
1470        };
1471        let event = this
1472            .execution_state_sender
1473            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1474            .recv_response()?;
1475        // TODO(#365): Consider separate event fee categories.
1476        this.resource_controller
1477            .track_bytes_read(event.len() as u64)?;
1478        Ok(event)
1479    }
1480
1481    fn subscribe_to_events(
1482        &mut self,
1483        chain_id: ChainId,
1484        application_id: ApplicationId,
1485        stream_name: StreamName,
1486    ) -> Result<(), ExecutionError> {
1487        let this = self.inner();
1488        ensure!(
1489            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1490            ExecutionError::StreamNameTooLong
1491        );
1492        let stream_id = StreamId {
1493            stream_name,
1494            application_id: application_id.into(),
1495        };
1496        let subscriber_app_id = this.current_application().id;
1497        this.execution_state_sender
1498            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1499                chain_id,
1500                stream_id,
1501                subscriber_app_id,
1502                callback,
1503            })?
1504            .recv_response()?;
1505        Ok(())
1506    }
1507
1508    fn unsubscribe_from_events(
1509        &mut self,
1510        chain_id: ChainId,
1511        application_id: ApplicationId,
1512        stream_name: StreamName,
1513    ) -> Result<(), ExecutionError> {
1514        let this = self.inner();
1515        ensure!(
1516            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1517            ExecutionError::StreamNameTooLong
1518        );
1519        let stream_id = StreamId {
1520            stream_name,
1521            application_id: application_id.into(),
1522        };
1523        let subscriber_app_id = this.current_application().id;
1524        this.execution_state_sender
1525            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1526                chain_id,
1527                stream_id,
1528                subscriber_app_id,
1529                callback,
1530            })?
1531            .recv_response()?;
1532        Ok(())
1533    }
1534
1535    fn query_service(
1536        &mut self,
1537        application_id: ApplicationId,
1538        query: Vec<u8>,
1539    ) -> Result<Vec<u8>, ExecutionError> {
1540        let mut this = self.inner();
1541
1542        let app_permissions = this
1543            .execution_state_sender
1544            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1545            .recv_response()?;
1546
1547        let app_id = this.current_application().id;
1548        ensure!(
1549            app_permissions.can_call_services(&app_id),
1550            ExecutionError::UnauthorizedApplication(app_id)
1551        );
1552
1553        this.resource_controller.track_service_oracle_call()?;
1554
1555        this.run_service_oracle_query(application_id, query)
1556    }
1557
1558    fn open_chain(
1559        &mut self,
1560        ownership: ChainOwnership,
1561        application_permissions: ApplicationPermissions,
1562        balance: Amount,
1563    ) -> Result<ChainId, ExecutionError> {
1564        let parent_id = self.inner().chain_id;
1565        let block_height = self.block_height()?;
1566
1567        let timestamp = self.inner().user_context;
1568
1569        let chain_id = self
1570            .inner()
1571            .execution_state_sender
1572            .send_request(|callback| ExecutionRequest::OpenChain {
1573                ownership,
1574                balance,
1575                parent_id,
1576                block_height,
1577                timestamp,
1578                application_permissions,
1579                callback,
1580            })?
1581            .recv_response()?;
1582
1583        Ok(chain_id)
1584    }
1585
1586    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1587        let this = self.inner();
1588        let application_id = this.current_application().id;
1589        this.execution_state_sender
1590            .send_request(|callback| ExecutionRequest::CloseChain {
1591                application_id,
1592                callback,
1593            })?
1594            .recv_response()?
1595    }
1596
1597    fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1598        let this = self.inner();
1599        let application_id = this.current_application().id;
1600        this.execution_state_sender
1601            .send_request(|callback| ExecutionRequest::ChangeOwnership {
1602                application_id,
1603                ownership,
1604                callback,
1605            })?
1606            .recv_response()?
1607    }
1608
1609    fn change_application_permissions(
1610        &mut self,
1611        application_permissions: ApplicationPermissions,
1612    ) -> Result<(), ExecutionError> {
1613        let this = self.inner();
1614        let application_id = this.current_application().id;
1615        this.execution_state_sender
1616            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1617                application_id,
1618                application_permissions,
1619                callback,
1620            })?
1621            .recv_response()?
1622    }
1623
1624    fn peek_application_index(&mut self) -> Result<u32, ExecutionError> {
1625        let index = self
1626            .inner()
1627            .execution_state_sender
1628            .send_request(move |callback| ExecutionRequest::PeekApplicationIndex { callback })?
1629            .recv_response()?;
1630        Ok(index)
1631    }
1632
1633    fn create_application(
1634        &mut self,
1635        module_id: ModuleId,
1636        parameters: Vec<u8>,
1637        argument: Vec<u8>,
1638        required_application_ids: Vec<ApplicationId>,
1639    ) -> Result<ApplicationId, ExecutionError> {
1640        let chain_id = self.inner().chain_id;
1641        let block_height = self.block_height()?;
1642
1643        let CreateApplicationResult { app_id } = self
1644            .inner()
1645            .execution_state_sender
1646            .send_request(move |callback| ExecutionRequest::CreateApplication {
1647                chain_id,
1648                block_height,
1649                module_id,
1650                parameters,
1651                required_application_ids,
1652                callback,
1653            })?
1654            .recv_response()?;
1655
1656        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1657
1658        contract
1659            .try_lock()
1660            .expect("Applications should not have reentrant calls")
1661            .instantiate(argument)?;
1662
1663        self.inner().finish_call();
1664
1665        Ok(app_id)
1666    }
1667
1668    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1669        let blob = Blob::new_data(bytes);
1670        let blob_id = blob.id();
1671        let this = self.inner();
1672        this.execution_state_sender
1673            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1674            .recv_response()?;
1675        Ok(DataBlobHash(blob_id.hash))
1676    }
1677
1678    fn publish_module(
1679        &mut self,
1680        contract: Bytecode,
1681        service: Bytecode,
1682        vm_runtime: VmRuntime,
1683    ) -> Result<ModuleId, ExecutionError> {
1684        let (blobs, module_id) =
1685            crate::runtime::create_bytecode_blobs_sync(&contract, &service, vm_runtime);
1686        let this = self.inner();
1687        for blob in blobs {
1688            this.execution_state_sender
1689                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1690                .recv_response()?;
1691        }
1692        Ok(module_id)
1693    }
1694
1695    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1696        let this = self.inner();
1697        let round = this.round;
1698        this.execution_state_sender
1699            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1700            .recv_response()
1701    }
1702
1703    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1704        let mut this = self.inner();
1705        let id = this.current_application().id;
1706        let state = this.view_user_states.entry(id).or_default();
1707        state.force_all_pending_queries()?;
1708        this.resource_controller.track_write_operations(
1709            batch
1710                .num_operations()
1711                .try_into()
1712                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1713        )?;
1714        this.resource_controller
1715            .track_bytes_written(batch.size() as u64)?;
1716        this.execution_state_sender
1717            .send_request(|callback| ExecutionRequest::WriteBatch {
1718                id,
1719                batch,
1720                callback,
1721            })?
1722            .recv_response()?;
1723        Ok(())
1724    }
1725}
1726
1727impl ServiceSyncRuntime {
1728    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1729    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1730        Self::new_with_deadline(execution_state_sender, context, None)
1731    }
1732
1733    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1734    pub fn new_with_deadline(
1735        execution_state_sender: ExecutionStateSender,
1736        context: QueryContext,
1737        deadline: Option<Instant>,
1738    ) -> Self {
1739        // Query the allow_application_logs setting from the execution state.
1740        let allow_application_logs = execution_state_sender
1741            .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1742            .ok()
1743            .and_then(|receiver| receiver.recv_response().ok())
1744            .unwrap_or(false);
1745
1746        let runtime = SyncRuntime(Some(
1747            SyncRuntimeInternal::new(
1748                context.chain_id,
1749                context.next_block_height,
1750                None,
1751                None,
1752                execution_state_sender,
1753                deadline,
1754                None,
1755                ResourceController::default(),
1756                (),
1757                allow_application_logs,
1758            )
1759            .into(),
1760        ));
1761
1762        ServiceSyncRuntime {
1763            runtime,
1764            current_context: context,
1765        }
1766    }
1767
1768    /// Preloads the code of a service into the runtime's memory.
1769    pub(crate) fn preload_service(
1770        &self,
1771        id: ApplicationId,
1772        code: UserServiceCode,
1773        description: ApplicationDescription,
1774    ) {
1775        let this = self
1776            .runtime
1777            .0
1778            .as_ref()
1779            .expect("services shouldn't be preloaded while the runtime is being dropped");
1780        let mut this_guard = this.inner();
1781
1782        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1783            entry.insert((code, description));
1784        }
1785    }
1786
1787    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1788    pub fn run(&mut self, incoming_requests: &std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1789        while let Ok(request) = incoming_requests.recv() {
1790            let ServiceRuntimeRequest::Query {
1791                application_id,
1792                context,
1793                query,
1794                callback,
1795            } = request;
1796
1797            let result = self
1798                .prepare_for_query(context)
1799                .and_then(|()| self.run_query(application_id, query));
1800
1801            if let Err(err) = callback.send(result) {
1802                tracing::debug!(%err, "Receiver for query result has been dropped");
1803            }
1804        }
1805    }
1806
1807    /// Prepares the runtime to query an application.
1808    pub(crate) fn prepare_for_query(
1809        &mut self,
1810        new_context: QueryContext,
1811    ) -> Result<(), ExecutionError> {
1812        let expected_context = QueryContext {
1813            local_time: new_context.local_time,
1814            ..self.current_context
1815        };
1816
1817        if new_context != expected_context {
1818            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1819            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1820        } else {
1821            self.handle_mut()
1822                .inner()
1823                .execution_state_sender
1824                .send_request(|callback| ExecutionRequest::SetLocalTime {
1825                    local_time: new_context.local_time,
1826                    callback,
1827                })?
1828                .recv_response()?;
1829        }
1830        Ok(())
1831    }
1832
1833    /// Queries an application specified by its [`ApplicationId`].
1834    pub(crate) fn run_query(
1835        &mut self,
1836        application_id: ApplicationId,
1837        query: Vec<u8>,
1838    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1839        let this = self.handle_mut();
1840        let response = this.try_query_application(application_id, query)?;
1841        let operations = mem::take(&mut this.inner().scheduled_operations);
1842
1843        Ok(QueryOutcome {
1844            response,
1845            operations,
1846        })
1847    }
1848
1849    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1850    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1851        self.runtime.0.as_mut().expect(
1852            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1853        )
1854    }
1855}
1856
1857impl ServiceRuntime for ServiceSyncRuntimeHandle {
1858    /// Note that queries are not available from writable contexts.
1859    fn try_query_application(
1860        &mut self,
1861        queried_id: ApplicationId,
1862        argument: Vec<u8>,
1863    ) -> Result<Vec<u8>, ExecutionError> {
1864        let service = {
1865            let mut this = self.inner();
1866
1867            // Load the application.
1868            let application = this.load_service_instance(self.clone(), queried_id)?;
1869            // Make the call to user code.
1870            this.push_application(ApplicationStatus {
1871                caller_id: None,
1872                id: queried_id,
1873                description: application.description,
1874                signer: None,
1875            });
1876            application.instance
1877        };
1878        let response = service
1879            .try_lock()
1880            .expect("Applications should not have reentrant calls")
1881            .handle_query(argument)?;
1882        self.inner().pop_application();
1883        Ok(response)
1884    }
1885
1886    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1887        let mut this = self.inner();
1888        let application_id = this.current_application().id;
1889
1890        this.scheduled_operations.push(Operation::User {
1891            application_id,
1892            bytes: operation,
1893        });
1894
1895        Ok(())
1896    }
1897
1898    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1899        if let Some(deadline) = self.inner().deadline {
1900            if Instant::now() >= deadline {
1901                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1902            }
1903        }
1904        Ok(())
1905    }
1906}
1907
1908/// A request to the service runtime actor.
1909pub enum ServiceRuntimeRequest {
1910    Query {
1911        application_id: ApplicationId,
1912        context: QueryContext,
1913        query: Vec<u8>,
1914        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1915    },
1916}
1917
1918/// The origin of the execution.
1919#[derive(Clone, Copy, Debug)]
1920struct ExecutingMessage {
1921    is_bouncing: bool,
1922    origin: ChainId,
1923}
1924
1925impl From<&MessageContext> for ExecutingMessage {
1926    fn from(context: &MessageContext) -> Self {
1927        ExecutingMessage {
1928            is_bouncing: context.is_bouncing,
1929            origin: context.origin,
1930        }
1931    }
1932}
1933
1934/// Creates a compressed contract and service bytecode synchronously.
1935pub fn create_bytecode_blobs_sync(
1936    contract: &Bytecode,
1937    service: &Bytecode,
1938    vm_runtime: VmRuntime,
1939) -> (Vec<Blob>, ModuleId) {
1940    match vm_runtime {
1941        VmRuntime::Wasm => {
1942            let compressed_contract = contract.compress();
1943            let compressed_service = service.compress();
1944            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1945            let service_blob = Blob::new_service_bytecode(compressed_service);
1946            let module_id =
1947                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1948            (vec![contract_blob, service_blob], module_id)
1949        }
1950        VmRuntime::Evm => {
1951            let compressed_contract = contract.compress();
1952            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1953            let module_id = ModuleId::new(
1954                evm_contract_blob.id().hash,
1955                evm_contract_blob.id().hash,
1956                vm_runtime,
1957            );
1958            (vec![evm_contract_blob], module_id)
1959        }
1960    }
1961}