linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29    execution::UserAction,
30    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31    resources::ResourceController,
32    system::CreateApplicationResult,
33    util::{ReceiverExt, UnboundedSenderExt},
34    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45    type UserContext;
46    type Code;
47}
48
49impl WithContext for UserContractInstance {
50    type UserContext = Timestamp;
51    type Code = UserContractCode;
52}
53
54impl WithContext for UserServiceInstance {
55    type UserContext = ();
56    type Code = UserServiceCode;
57}
58
59#[cfg(test)]
60impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
61    type UserContext = ();
62    type Code = ();
63}
64
65#[derive(Debug)]
66pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
67
68pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
69
70pub struct ServiceSyncRuntime {
71    runtime: SyncRuntime<UserServiceInstance>,
72    current_context: QueryContext,
73}
74
75#[derive(Debug)]
76pub struct SyncRuntimeHandle<UserInstance: WithContext>(
77    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
78);
79
80pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
81pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
82
83/// Runtime data tracked during the execution of a transaction on the synchronous thread.
84#[derive(Debug)]
85pub struct SyncRuntimeInternal<UserInstance: WithContext> {
86    /// The current chain ID.
87    chain_id: ChainId,
88    /// The height of the next block that will be added to this chain. During operations
89    /// and messages, this is the current block height.
90    height: BlockHeight,
91    /// The current consensus round. Only available during block validation in multi-leader rounds.
92    round: Option<u32>,
93    /// The current message being executed, if there is one.
94    #[debug(skip_if = Option::is_none)]
95    executing_message: Option<ExecutingMessage>,
96
97    /// How to interact with the storage view of the execution state.
98    execution_state_sender: ExecutionStateSender,
99
100    /// If applications are being finalized.
101    ///
102    /// If [`true`], disables cross-application calls.
103    is_finalizing: bool,
104    /// Applications that need to be finalized.
105    applications_to_finalize: Vec<ApplicationId>,
106
107    /// Application instances loaded in this transaction.
108    preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
109    /// Application instances loaded in this transaction.
110    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
111    /// The current stack of application descriptions.
112    call_stack: Vec<ApplicationStatus>,
113    /// The set of the IDs of the applications that are in the `call_stack`.
114    active_applications: HashSet<ApplicationId>,
115    /// The operations scheduled during this query.
116    scheduled_operations: Vec<Operation>,
117
118    /// Track application states based on views.
119    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
120
121    /// The deadline this runtime should finish executing.
122    ///
123    /// Used to limit the execution time of services running as oracles.
124    deadline: Option<Instant>,
125
126    /// Where to send a refund for the unused part of the grant after execution, if any.
127    #[debug(skip_if = Option::is_none)]
128    refund_grant_to: Option<Account>,
129    /// Controller to track fuel and storage consumption.
130    resource_controller: ResourceController,
131    /// Additional context for the runtime.
132    user_context: UserInstance::UserContext,
133    /// Whether contract log messages should be output.
134    allow_application_logs: bool,
135}
136
137/// The runtime status of an application.
138#[derive(Debug)]
139struct ApplicationStatus {
140    /// The caller application ID, if forwarded during the call.
141    caller_id: Option<ApplicationId>,
142    /// The application ID.
143    id: ApplicationId,
144    /// The application description.
145    description: ApplicationDescription,
146    /// The authenticated owner for the execution thread, if any.
147    signer: Option<AccountOwner>,
148}
149
150/// A loaded application instance.
151#[derive(Debug)]
152struct LoadedApplication<Instance> {
153    instance: Arc<Mutex<Instance>>,
154    description: ApplicationDescription,
155}
156
157impl<Instance> LoadedApplication<Instance> {
158    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
159    fn new(instance: Instance, description: ApplicationDescription) -> Self {
160        LoadedApplication {
161            instance: Arc::new(Mutex::new(instance)),
162            description,
163        }
164    }
165}
166
167impl<Instance> Clone for LoadedApplication<Instance> {
168    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
169    // bound
170    fn clone(&self) -> Self {
171        LoadedApplication {
172            instance: self.instance.clone(),
173            description: self.description.clone(),
174        }
175    }
176}
177
178#[derive(Debug)]
179enum Promise<T> {
180    Ready(T),
181    Pending(Receiver<T>),
182}
183
184impl<T> Promise<T> {
185    fn force(&mut self) -> Result<(), ExecutionError> {
186        if let Promise::Pending(receiver) = self {
187            let value = receiver
188                .recv_ref()
189                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
190            *self = Promise::Ready(value);
191        }
192        Ok(())
193    }
194
195    fn read(self) -> Result<T, ExecutionError> {
196        match self {
197            Promise::Pending(receiver) => {
198                let value = receiver.recv_response()?;
199                Ok(value)
200            }
201            Promise::Ready(value) => Ok(value),
202        }
203    }
204}
205
206/// Manages a set of pending queries returning values of type `T`.
207#[derive(Debug, Default)]
208struct QueryManager<T> {
209    /// The queries in progress.
210    pending_queries: BTreeMap<u32, Promise<T>>,
211    /// The number of queries ever registered so far. Used for the index of the next query.
212    query_count: u32,
213    /// The number of active queries.
214    active_query_count: u32,
215}
216
217impl<T> QueryManager<T> {
218    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
219        let id = self.query_count;
220        self.pending_queries.insert(id, Promise::Pending(receiver));
221        self.query_count = self
222            .query_count
223            .checked_add(1)
224            .ok_or(ArithmeticError::Overflow)?;
225        self.active_query_count = self
226            .active_query_count
227            .checked_add(1)
228            .ok_or(ArithmeticError::Overflow)?;
229        Ok(id)
230    }
231
232    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
233        let promise = self
234            .pending_queries
235            .remove(&id)
236            .ok_or(ExecutionError::InvalidPromise)?;
237        let value = promise.read()?;
238        self.active_query_count -= 1;
239        Ok(value)
240    }
241
242    fn force_all(&mut self) -> Result<(), ExecutionError> {
243        for promise in self.pending_queries.values_mut() {
244            promise.force()?;
245        }
246        Ok(())
247    }
248}
249
250type Keys = Vec<Vec<u8>>;
251type Value = Vec<u8>;
252type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
253
254#[derive(Debug, Default)]
255struct ViewUserState {
256    /// The contains-key queries in progress.
257    contains_key_queries: QueryManager<bool>,
258    /// The contains-keys queries in progress.
259    contains_keys_queries: QueryManager<Vec<bool>>,
260    /// The read-value queries in progress.
261    read_value_queries: QueryManager<Option<Value>>,
262    /// The read-multi-values queries in progress.
263    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
264    /// The find-keys queries in progress.
265    find_keys_queries: QueryManager<Keys>,
266    /// The find-key-values queries in progress.
267    find_key_values_queries: QueryManager<KeyValues>,
268}
269
270impl ViewUserState {
271    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
272        self.contains_key_queries.force_all()?;
273        self.contains_keys_queries.force_all()?;
274        self.read_value_queries.force_all()?;
275        self.read_multi_values_queries.force_all()?;
276        self.find_keys_queries.force_all()?;
277        self.find_key_values_queries.force_all()?;
278        Ok(())
279    }
280}
281
282impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
283    type Target = SyncRuntimeHandle<UserInstance>;
284
285    fn deref(&self) -> &Self::Target {
286        self.0.as_ref().expect(
287            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288        )
289    }
290}
291
292impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
293    fn deref_mut(&mut self) -> &mut Self::Target {
294        self.0.as_mut().expect(
295            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
296        )
297    }
298}
299
300impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
301    fn drop(&mut self) {
302        // Ensure the `loaded_applications` are cleared to prevent circular references in
303        // the runtime
304        if let Some(handle) = self.0.take() {
305            handle.inner().loaded_applications.clear();
306        }
307    }
308}
309
310impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
311    #[expect(clippy::too_many_arguments)]
312    fn new(
313        chain_id: ChainId,
314        height: BlockHeight,
315        round: Option<u32>,
316        executing_message: Option<ExecutingMessage>,
317        execution_state_sender: ExecutionStateSender,
318        deadline: Option<Instant>,
319        refund_grant_to: Option<Account>,
320        resource_controller: ResourceController,
321        user_context: UserInstance::UserContext,
322        allow_application_logs: bool,
323    ) -> Self {
324        Self {
325            chain_id,
326            height,
327            round,
328            executing_message,
329            execution_state_sender,
330            is_finalizing: false,
331            applications_to_finalize: Vec::new(),
332            preloaded_applications: HashMap::new(),
333            loaded_applications: HashMap::new(),
334            call_stack: Vec::new(),
335            active_applications: HashSet::new(),
336            view_user_states: BTreeMap::new(),
337            deadline,
338            refund_grant_to,
339            resource_controller,
340            scheduled_operations: Vec::new(),
341            user_context,
342            allow_application_logs,
343        }
344    }
345
346    /// Returns the [`ApplicationStatus`] of the current application.
347    ///
348    /// The current application is the last to be pushed to the `call_stack`.
349    ///
350    /// # Panics
351    ///
352    /// If the call stack is empty.
353    fn current_application(&self) -> &ApplicationStatus {
354        self.call_stack
355            .last()
356            .expect("Call stack is unexpectedly empty")
357    }
358
359    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
360    ///
361    /// Ensures the application's ID is also tracked in the `active_applications` set.
362    fn push_application(&mut self, status: ApplicationStatus) {
363        self.active_applications.insert(status.id);
364        self.call_stack.push(status);
365    }
366
367    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
368    ///
369    /// Ensures the application's ID is also removed from the `active_applications` set.
370    ///
371    /// # Panics
372    ///
373    /// If the call stack is empty.
374    fn pop_application(&mut self) -> ApplicationStatus {
375        let status = self
376            .call_stack
377            .pop()
378            .expect("Can't remove application from empty call stack");
379        assert!(self.active_applications.remove(&status.id));
380        status
381    }
382
383    /// Ensures that a call to `application_id` is not-reentrant.
384    ///
385    /// Returns an error if there already is an entry for `application_id` in the call stack.
386    fn check_for_reentrancy(
387        &mut self,
388        application_id: ApplicationId,
389    ) -> Result<(), ExecutionError> {
390        ensure!(
391            !self.active_applications.contains(&application_id),
392            ExecutionError::ReentrantCall(application_id)
393        );
394        Ok(())
395    }
396}
397
398impl SyncRuntimeInternal<UserContractInstance> {
399    /// Loads a contract instance, initializing it with this runtime if needed.
400    fn load_contract_instance(
401        &mut self,
402        this: SyncRuntimeHandle<UserContractInstance>,
403        id: ApplicationId,
404    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
405        match self.loaded_applications.entry(id) {
406            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
407
408            hash_map::Entry::Vacant(entry) => {
409                // First time actually using the application. Let's see if the code was
410                // pre-loaded.
411                let (code, description) = match self.preloaded_applications.entry(id) {
412                    // TODO(#2927): support dynamic loading of modules on the Web
413                    #[cfg(web)]
414                    hash_map::Entry::Vacant(_) => {
415                        drop(this);
416                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
417                            id,
418                        )));
419                    }
420                    #[cfg(not(web))]
421                    hash_map::Entry::Vacant(entry) => {
422                        let (code, description) = self
423                            .execution_state_sender
424                            .send_request(move |callback| ExecutionRequest::LoadContract {
425                                id,
426                                callback,
427                            })?
428                            .recv_response()?;
429                        entry.insert((code, description)).clone()
430                    }
431                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
432                };
433                let instance = code.instantiate(this)?;
434
435                self.applications_to_finalize.push(id);
436                Ok(entry
437                    .insert(LoadedApplication::new(instance, description))
438                    .clone())
439            }
440        }
441    }
442
443    /// Configures the runtime for executing a call to a different contract.
444    fn prepare_for_call(
445        &mut self,
446        this: ContractSyncRuntimeHandle,
447        authenticated: bool,
448        callee_id: ApplicationId,
449    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
450        self.check_for_reentrancy(callee_id)?;
451
452        ensure!(
453            !self.is_finalizing,
454            ExecutionError::CrossApplicationCallInFinalize {
455                caller_id: Box::new(self.current_application().id),
456                callee_id: Box::new(callee_id),
457            }
458        );
459
460        // Load the application.
461        let application = self.load_contract_instance(this, callee_id)?;
462
463        let caller = self.current_application();
464        let caller_id = caller.id;
465        let caller_signer = caller.signer;
466        // Make the call to user code.
467        let authenticated_owner = match caller_signer {
468            Some(signer) if authenticated => Some(signer),
469            _ => None,
470        };
471        let authenticated_caller_id = authenticated.then_some(caller_id);
472        self.push_application(ApplicationStatus {
473            caller_id: authenticated_caller_id,
474            id: callee_id,
475            description: application.description,
476            // Allow further nested calls to be authenticated if this one is.
477            signer: authenticated_owner,
478        });
479        Ok(application.instance)
480    }
481
482    /// Cleans up the runtime after the execution of a call to a different contract.
483    fn finish_call(&mut self) {
484        self.pop_application();
485    }
486
487    /// Runs the service in a separate thread as an oracle.
488    fn run_service_oracle_query(
489        &mut self,
490        application_id: ApplicationId,
491        query: Vec<u8>,
492    ) -> Result<Vec<u8>, ExecutionError> {
493        let timeout = self
494            .resource_controller
495            .remaining_service_oracle_execution_time()?;
496        let execution_start = Instant::now();
497        let deadline = Some(execution_start + timeout);
498        let response = self
499            .execution_state_sender
500            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
501                deadline,
502                application_id,
503                next_block_height: self.height,
504                query,
505                callback,
506            })?
507            .recv_response()?;
508
509        self.resource_controller
510            .track_service_oracle_execution(execution_start.elapsed())?;
511        self.resource_controller
512            .track_service_oracle_response(response.len())?;
513
514        Ok(response)
515    }
516}
517
518impl SyncRuntimeInternal<UserServiceInstance> {
519    /// Initializes a service instance with this runtime.
520    fn load_service_instance(
521        &mut self,
522        this: SyncRuntimeHandle<UserServiceInstance>,
523        id: ApplicationId,
524    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
525        match self.loaded_applications.entry(id) {
526            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
527
528            hash_map::Entry::Vacant(entry) => {
529                // First time actually using the application. Let's see if the code was
530                // pre-loaded.
531                let (code, description) = match self.preloaded_applications.entry(id) {
532                    // TODO(#2927): support dynamic loading of modules on the Web
533                    #[cfg(web)]
534                    hash_map::Entry::Vacant(_) => {
535                        drop(this);
536                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
537                            id,
538                        )));
539                    }
540                    #[cfg(not(web))]
541                    hash_map::Entry::Vacant(entry) => {
542                        let (code, description) = self
543                            .execution_state_sender
544                            .send_request(move |callback| ExecutionRequest::LoadService {
545                                id,
546                                callback,
547                            })?
548                            .recv_response()?;
549                        entry.insert((code, description)).clone()
550                    }
551                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
552                };
553                let instance = code.instantiate(this)?;
554
555                self.applications_to_finalize.push(id);
556                Ok(entry
557                    .insert(LoadedApplication::new(instance, description))
558                    .clone())
559            }
560        }
561    }
562}
563
564impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
565    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
566        let handle = self.0.take().expect(
567            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
568        );
569        let runtime = Arc::into_inner(handle.0)?
570            .into_inner()
571            .expect("`SyncRuntime` should run in a single thread which should not panic");
572        Some(runtime)
573    }
574}
575
576impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
577    for SyncRuntimeHandle<UserInstance>
578{
579    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
580        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
581    }
582}
583
584impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
585    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
586        self.0
587            .try_lock()
588            .expect("Synchronous runtimes run on a single execution thread")
589    }
590}
591
592impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
593where
594    Self: ContractOrServiceRuntime,
595{
596    type Read = ();
597    type ReadValueBytes = u32;
598    type ContainsKey = u32;
599    type ContainsKeys = u32;
600    type ReadMultiValuesBytes = u32;
601    type FindKeysByPrefix = u32;
602    type FindKeyValuesByPrefix = u32;
603
604    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
605        let mut this = self.inner();
606        let chain_id = this.chain_id;
607        this.resource_controller.track_runtime_chain_id()?;
608        Ok(chain_id)
609    }
610
611    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
612        let mut this = self.inner();
613        let height = this.height;
614        this.resource_controller.track_runtime_block_height()?;
615        Ok(height)
616    }
617
618    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
619        let mut this = self.inner();
620        let application_id = this.current_application().id;
621        this.resource_controller.track_runtime_application_id()?;
622        Ok(application_id)
623    }
624
625    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
626        let mut this = self.inner();
627        let application_creator_chain_id = this.current_application().description.creator_chain_id;
628        this.resource_controller.track_runtime_application_id()?;
629        Ok(application_creator_chain_id)
630    }
631
632    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
633        let mut this = self.inner();
634        let parameters = this.current_application().description.parameters.clone();
635        this.resource_controller
636            .track_runtime_application_parameters(&parameters)?;
637        Ok(parameters)
638    }
639
640    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
641        let mut this = self.inner();
642        let timestamp = this
643            .execution_state_sender
644            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
645            .recv_response()?;
646        this.resource_controller.track_runtime_timestamp()?;
647        Ok(timestamp)
648    }
649
650    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
651        let mut this = self.inner();
652        let balance = this
653            .execution_state_sender
654            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
655            .recv_response()?;
656        this.resource_controller.track_runtime_balance()?;
657        Ok(balance)
658    }
659
660    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
661        let mut this = self.inner();
662        let balance = this
663            .execution_state_sender
664            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
665            .recv_response()?;
666        this.resource_controller.track_runtime_balance()?;
667        Ok(balance)
668    }
669
670    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
671        let mut this = self.inner();
672        let owner_balances = this
673            .execution_state_sender
674            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
675            .recv_response()?;
676        this.resource_controller
677            .track_runtime_owner_balances(&owner_balances)?;
678        Ok(owner_balances)
679    }
680
681    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
682        let mut this = self.inner();
683        let owners = this
684            .execution_state_sender
685            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
686            .recv_response()?;
687        this.resource_controller.track_runtime_owners(&owners)?;
688        Ok(owners)
689    }
690
691    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
692        let mut this = self.inner();
693        let chain_ownership = this
694            .execution_state_sender
695            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
696            .recv_response()?;
697        this.resource_controller
698            .track_runtime_chain_ownership(&chain_ownership)?;
699        Ok(chain_ownership)
700    }
701
702    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
703        let mut this = self.inner();
704        let id = this.current_application().id;
705        this.resource_controller.track_read_operation()?;
706        let receiver = this
707            .execution_state_sender
708            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
709        let state = this.view_user_states.entry(id).or_default();
710        state.contains_key_queries.register(receiver)
711    }
712
713    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
714        let mut this = self.inner();
715        let id = this.current_application().id;
716        let state = this.view_user_states.entry(id).or_default();
717        let value = state.contains_key_queries.wait(*promise)?;
718        Ok(value)
719    }
720
721    fn contains_keys_new(
722        &mut self,
723        keys: Vec<Vec<u8>>,
724    ) -> Result<Self::ContainsKeys, ExecutionError> {
725        let mut this = self.inner();
726        let id = this.current_application().id;
727        this.resource_controller.track_read_operation()?;
728        let receiver = this
729            .execution_state_sender
730            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
731        let state = this.view_user_states.entry(id).or_default();
732        state.contains_keys_queries.register(receiver)
733    }
734
735    fn contains_keys_wait(
736        &mut self,
737        promise: &Self::ContainsKeys,
738    ) -> Result<Vec<bool>, ExecutionError> {
739        let mut this = self.inner();
740        let id = this.current_application().id;
741        let state = this.view_user_states.entry(id).or_default();
742        let value = state.contains_keys_queries.wait(*promise)?;
743        Ok(value)
744    }
745
746    fn read_multi_values_bytes_new(
747        &mut self,
748        keys: Vec<Vec<u8>>,
749    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
750        let mut this = self.inner();
751        let id = this.current_application().id;
752        this.resource_controller.track_read_operation()?;
753        let receiver = this.execution_state_sender.send_request(move |callback| {
754            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
755        })?;
756        let state = this.view_user_states.entry(id).or_default();
757        state.read_multi_values_queries.register(receiver)
758    }
759
760    fn read_multi_values_bytes_wait(
761        &mut self,
762        promise: &Self::ReadMultiValuesBytes,
763    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
764        let mut this = self.inner();
765        let id = this.current_application().id;
766        let state = this.view_user_states.entry(id).or_default();
767        let values = state.read_multi_values_queries.wait(*promise)?;
768        for value in &values {
769            if let Some(value) = &value {
770                this.resource_controller
771                    .track_bytes_read(value.len() as u64)?;
772            }
773        }
774        Ok(values)
775    }
776
777    fn read_value_bytes_new(
778        &mut self,
779        key: Vec<u8>,
780    ) -> Result<Self::ReadValueBytes, ExecutionError> {
781        let mut this = self.inner();
782        let id = this.current_application().id;
783        this.resource_controller.track_read_operation()?;
784        let receiver = this
785            .execution_state_sender
786            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
787        let state = this.view_user_states.entry(id).or_default();
788        state.read_value_queries.register(receiver)
789    }
790
791    fn read_value_bytes_wait(
792        &mut self,
793        promise: &Self::ReadValueBytes,
794    ) -> Result<Option<Vec<u8>>, ExecutionError> {
795        let mut this = self.inner();
796        let id = this.current_application().id;
797        let value = {
798            let state = this.view_user_states.entry(id).or_default();
799            state.read_value_queries.wait(*promise)?
800        };
801        if let Some(value) = &value {
802            this.resource_controller
803                .track_bytes_read(value.len() as u64)?;
804        }
805        Ok(value)
806    }
807
808    fn find_keys_by_prefix_new(
809        &mut self,
810        key_prefix: Vec<u8>,
811    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
812        let mut this = self.inner();
813        let id = this.current_application().id;
814        this.resource_controller.track_read_operation()?;
815        let receiver = this.execution_state_sender.send_request(move |callback| {
816            ExecutionRequest::FindKeysByPrefix {
817                id,
818                key_prefix,
819                callback,
820            }
821        })?;
822        let state = this.view_user_states.entry(id).or_default();
823        state.find_keys_queries.register(receiver)
824    }
825
826    fn find_keys_by_prefix_wait(
827        &mut self,
828        promise: &Self::FindKeysByPrefix,
829    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
830        let mut this = self.inner();
831        let id = this.current_application().id;
832        let keys = {
833            let state = this.view_user_states.entry(id).or_default();
834            state.find_keys_queries.wait(*promise)?
835        };
836        let mut read_size = 0;
837        for key in &keys {
838            read_size += key.len();
839        }
840        this.resource_controller
841            .track_bytes_read(read_size as u64)?;
842        Ok(keys)
843    }
844
845    fn find_key_values_by_prefix_new(
846        &mut self,
847        key_prefix: Vec<u8>,
848    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
849        let mut this = self.inner();
850        let id = this.current_application().id;
851        this.resource_controller.track_read_operation()?;
852        let receiver = this.execution_state_sender.send_request(move |callback| {
853            ExecutionRequest::FindKeyValuesByPrefix {
854                id,
855                key_prefix,
856                callback,
857            }
858        })?;
859        let state = this.view_user_states.entry(id).or_default();
860        state.find_key_values_queries.register(receiver)
861    }
862
863    fn find_key_values_by_prefix_wait(
864        &mut self,
865        promise: &Self::FindKeyValuesByPrefix,
866    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
867        let mut this = self.inner();
868        let id = this.current_application().id;
869        let state = this.view_user_states.entry(id).or_default();
870        let key_values = state.find_key_values_queries.wait(*promise)?;
871        let mut read_size = 0;
872        for (key, value) in &key_values {
873            read_size += key.len() + value.len();
874        }
875        this.resource_controller
876            .track_bytes_read(read_size as u64)?;
877        Ok(key_values)
878    }
879
880    fn perform_http_request(
881        &mut self,
882        request: http::Request,
883    ) -> Result<http::Response, ExecutionError> {
884        let mut this = self.inner();
885        let app_permissions = this
886            .execution_state_sender
887            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
888            .recv_response()?;
889
890        let app_id = this.current_application().id;
891        ensure!(
892            app_permissions.can_make_http_requests(&app_id),
893            ExecutionError::UnauthorizedApplication(app_id)
894        );
895
896        this.resource_controller.track_http_request()?;
897
898        this.execution_state_sender
899            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
900                request,
901                http_responses_are_oracle_responses:
902                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
903                callback,
904            })?
905            .recv_response()
906    }
907
908    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
909        let this = self.inner();
910        this.execution_state_sender
911            .send_request(|callback| ExecutionRequest::AssertBefore {
912                timestamp,
913                callback,
914            })?
915            .recv_response()?
916    }
917
918    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
919        let this = self.inner();
920        let blob_id = hash.into();
921        let content = this
922            .execution_state_sender
923            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
924            .recv_response()?;
925        Ok(content.into_vec_or_clone())
926    }
927
928    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
929        let this = self.inner();
930        let blob_id = hash.into();
931        this.execution_state_sender
932            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
933            .recv_response()
934    }
935
936    fn has_empty_storage(&mut self, application: ApplicationId) -> Result<bool, ExecutionError> {
937        let this = self.inner();
938        let (key_size, value_size) = this
939            .execution_state_sender
940            .send_request(move |callback| ExecutionRequest::TotalStorageSize {
941                application,
942                callback,
943            })?
944            .recv_response()?;
945        Ok(key_size + value_size == 0)
946    }
947
948    fn maximum_blob_size(&mut self) -> Result<u64, ExecutionError> {
949        Ok(self.inner().resource_controller.policy().maximum_blob_size)
950    }
951
952    fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
953        Ok(self.inner().allow_application_logs)
954    }
955}
956
957/// An extension trait to determine in compile time the different behaviors between contract and
958/// services in the implementation of [`BaseRuntime`].
959trait ContractOrServiceRuntime {
960    /// Configured to `true` if the HTTP response size should be limited to the oracle response
961    /// size.
962    ///
963    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
964    /// and only storing in the block a shorter oracle response.
965    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
966}
967
968impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
969    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
970}
971
972impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
973    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
974}
975
976impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
977    fn clone(&self) -> Self {
978        SyncRuntimeHandle(self.0.clone())
979    }
980}
981
982impl ContractSyncRuntime {
983    pub(crate) fn new(
984        execution_state_sender: ExecutionStateSender,
985        chain_id: ChainId,
986        refund_grant_to: Option<Account>,
987        resource_controller: ResourceController,
988        action: &UserAction,
989        allow_application_logs: bool,
990    ) -> Self {
991        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
992            SyncRuntimeInternal::new(
993                chain_id,
994                action.height(),
995                action.round(),
996                if let UserAction::Message(context, _) = action {
997                    Some(context.into())
998                } else {
999                    None
1000                },
1001                execution_state_sender,
1002                None,
1003                refund_grant_to,
1004                resource_controller,
1005                action.timestamp(),
1006                allow_application_logs,
1007            ),
1008        )))
1009    }
1010
1011    /// Preloads the code of a contract into the runtime's memory.
1012    pub(crate) fn preload_contract(
1013        &self,
1014        id: ApplicationId,
1015        code: UserContractCode,
1016        description: ApplicationDescription,
1017    ) -> Result<(), ExecutionError> {
1018        let this = self
1019            .0
1020            .as_ref()
1021            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1022        let mut this_guard = this.inner();
1023
1024        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1025            entry.insert((code, description));
1026        }
1027
1028        Ok(())
1029    }
1030
1031    /// Main entry point to start executing a user action.
1032    pub(crate) fn run_action(
1033        mut self,
1034        application_id: ApplicationId,
1035        chain_id: ChainId,
1036        action: UserAction,
1037    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1038        let result = self
1039            .deref_mut()
1040            .run_action(application_id, chain_id, action)?;
1041        let runtime = self
1042            .into_inner()
1043            .expect("Runtime clones should have been freed by now");
1044
1045        Ok((result, runtime.resource_controller))
1046    }
1047}
1048
1049impl ContractSyncRuntimeHandle {
1050    fn run_action(
1051        &mut self,
1052        application_id: ApplicationId,
1053        chain_id: ChainId,
1054        action: UserAction,
1055    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1056        let finalize_context = FinalizeContext {
1057            authenticated_owner: action.signer(),
1058            chain_id,
1059            height: action.height(),
1060            round: action.round(),
1061        };
1062
1063        {
1064            let runtime = self.inner();
1065            assert_eq!(runtime.chain_id, chain_id);
1066            assert_eq!(runtime.height, action.height());
1067        }
1068
1069        let signer = action.signer();
1070        let closure = move |code: &mut UserContractInstance| match action {
1071            UserAction::Instantiate(_context, argument) => {
1072                code.instantiate(argument).map(|()| None)
1073            }
1074            UserAction::Operation(_context, operation) => {
1075                code.execute_operation(operation).map(Option::Some)
1076            }
1077            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1078            UserAction::ProcessStreams(_context, updates) => {
1079                code.process_streams(updates).map(|()| None)
1080            }
1081        };
1082
1083        let result = self.execute(application_id, signer, closure)?;
1084        self.finalize(finalize_context)?;
1085        Ok(result)
1086    }
1087
1088    /// Notifies all loaded applications that execution is finalizing.
1089    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1090        let applications = mem::take(&mut self.inner().applications_to_finalize)
1091            .into_iter()
1092            .rev();
1093
1094        self.inner().is_finalizing = true;
1095
1096        for application in applications {
1097            self.execute(application, context.authenticated_owner, |contract| {
1098                contract.finalize().map(|_| None)
1099            })?;
1100            self.inner().loaded_applications.remove(&application);
1101        }
1102
1103        Ok(())
1104    }
1105
1106    /// Executes a `closure` with the contract code for the `application_id`.
1107    fn execute(
1108        &mut self,
1109        application_id: ApplicationId,
1110        signer: Option<AccountOwner>,
1111        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1112    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1113        let contract = {
1114            let mut runtime = self.inner();
1115            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1116
1117            let status = ApplicationStatus {
1118                caller_id: None,
1119                id: application_id,
1120                description: application.description.clone(),
1121                signer,
1122            };
1123
1124            runtime.push_application(status);
1125
1126            application
1127        };
1128
1129        let result = closure(
1130            &mut contract
1131                .instance
1132                .try_lock()
1133                .expect("Application should not be already executing"),
1134        )?;
1135
1136        let mut runtime = self.inner();
1137        let application_status = runtime.pop_application();
1138        assert_eq!(application_status.caller_id, None);
1139        assert_eq!(application_status.id, application_id);
1140        assert_eq!(application_status.description, contract.description);
1141        assert_eq!(application_status.signer, signer);
1142        assert!(runtime.call_stack.is_empty());
1143
1144        Ok(result)
1145    }
1146}
1147
1148impl ContractRuntime for ContractSyncRuntimeHandle {
1149    fn authenticated_owner(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1150        let this = self.inner();
1151        Ok(this.current_application().signer)
1152    }
1153
1154    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1155        Ok(self
1156            .inner()
1157            .executing_message
1158            .map(|metadata| metadata.is_bouncing))
1159    }
1160
1161    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1162        Ok(self
1163            .inner()
1164            .executing_message
1165            .map(|metadata| metadata.origin))
1166    }
1167
1168    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1169        let this = self.inner();
1170        if this.call_stack.len() <= 1 {
1171            return Ok(None);
1172        }
1173        Ok(this.current_application().caller_id)
1174    }
1175
1176    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1177        Ok(match vm_runtime {
1178            VmRuntime::Wasm => {
1179                self.inner()
1180                    .resource_controller
1181                    .policy()
1182                    .maximum_wasm_fuel_per_block
1183            }
1184            VmRuntime::Evm => {
1185                self.inner()
1186                    .resource_controller
1187                    .policy()
1188                    .maximum_evm_fuel_per_block
1189            }
1190        })
1191    }
1192
1193    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1194        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1195    }
1196
1197    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1198        let mut this = self.inner();
1199        this.resource_controller.track_fuel(fuel, vm_runtime)
1200    }
1201
1202    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1203        let mut this = self.inner();
1204        let application = this.current_application();
1205        let application_id = application.id;
1206        let authenticated_owner = application.signer;
1207        let mut refund_grant_to = this.refund_grant_to;
1208
1209        let grant = this
1210            .resource_controller
1211            .policy()
1212            .total_price(&message.grant)?;
1213        if grant.is_zero() {
1214            refund_grant_to = None;
1215        } else {
1216            this.resource_controller.track_grant(grant)?;
1217        }
1218        let kind = if message.is_tracked {
1219            MessageKind::Tracked
1220        } else {
1221            MessageKind::Simple
1222        };
1223
1224        this.execution_state_sender
1225            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1226                message: OutgoingMessage {
1227                    destination: message.destination,
1228                    authenticated_owner,
1229                    refund_grant_to,
1230                    grant,
1231                    kind,
1232                    message: Message::User {
1233                        application_id,
1234                        bytes: message.message,
1235                    },
1236                },
1237                callback,
1238            })?
1239            .recv_response()?;
1240
1241        Ok(())
1242    }
1243
1244    fn transfer(
1245        &mut self,
1246        source: AccountOwner,
1247        destination: Account,
1248        amount: Amount,
1249    ) -> Result<(), ExecutionError> {
1250        let this = self.inner();
1251        let current_application = this.current_application();
1252        let application_id = current_application.id;
1253        let signer = current_application.signer;
1254
1255        this.execution_state_sender
1256            .send_request(|callback| ExecutionRequest::Transfer {
1257                source,
1258                destination,
1259                amount,
1260                signer,
1261                application_id,
1262                callback,
1263            })?
1264            .recv_response()?;
1265        Ok(())
1266    }
1267
1268    fn claim(
1269        &mut self,
1270        source: Account,
1271        destination: Account,
1272        amount: Amount,
1273    ) -> Result<(), ExecutionError> {
1274        let this = self.inner();
1275        let current_application = this.current_application();
1276        let application_id = current_application.id;
1277        let signer = current_application.signer;
1278
1279        this.execution_state_sender
1280            .send_request(|callback| ExecutionRequest::Claim {
1281                source,
1282                destination,
1283                amount,
1284                signer,
1285                application_id,
1286                callback,
1287            })?
1288            .recv_response()?;
1289        Ok(())
1290    }
1291
1292    fn try_call_application(
1293        &mut self,
1294        authenticated: bool,
1295        callee_id: ApplicationId,
1296        argument: Vec<u8>,
1297    ) -> Result<Vec<u8>, ExecutionError> {
1298        let contract = self
1299            .inner()
1300            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1301
1302        let value = contract
1303            .try_lock()
1304            .expect("Applications should not have reentrant calls")
1305            .execute_operation(argument)?;
1306
1307        self.inner().finish_call();
1308
1309        Ok(value)
1310    }
1311
1312    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1313        let mut this = self.inner();
1314        ensure!(
1315            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1316            ExecutionError::StreamNameTooLong
1317        );
1318        let application_id = GenericApplicationId::User(this.current_application().id);
1319        let stream_id = StreamId {
1320            stream_name,
1321            application_id,
1322        };
1323        let value_len = value.len() as u64;
1324        let index = this
1325            .execution_state_sender
1326            .send_request(|callback| ExecutionRequest::Emit {
1327                stream_id,
1328                value,
1329                callback,
1330            })?
1331            .recv_response()?;
1332        // TODO(#365): Consider separate event fee categories.
1333        this.resource_controller.track_bytes_written(value_len)?;
1334        Ok(index)
1335    }
1336
1337    fn read_event(
1338        &mut self,
1339        chain_id: ChainId,
1340        stream_name: StreamName,
1341        index: u32,
1342    ) -> Result<Vec<u8>, ExecutionError> {
1343        let mut this = self.inner();
1344        ensure!(
1345            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1346            ExecutionError::StreamNameTooLong
1347        );
1348        let application_id = GenericApplicationId::User(this.current_application().id);
1349        let stream_id = StreamId {
1350            stream_name,
1351            application_id,
1352        };
1353        let event_id = EventId {
1354            stream_id,
1355            index,
1356            chain_id,
1357        };
1358        let event = this
1359            .execution_state_sender
1360            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1361            .recv_response()?;
1362        // TODO(#365): Consider separate event fee categories.
1363        this.resource_controller
1364            .track_bytes_read(event.len() as u64)?;
1365        Ok(event)
1366    }
1367
1368    fn subscribe_to_events(
1369        &mut self,
1370        chain_id: ChainId,
1371        application_id: ApplicationId,
1372        stream_name: StreamName,
1373    ) -> Result<(), ExecutionError> {
1374        let this = self.inner();
1375        ensure!(
1376            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1377            ExecutionError::StreamNameTooLong
1378        );
1379        let stream_id = StreamId {
1380            stream_name,
1381            application_id: application_id.into(),
1382        };
1383        let subscriber_app_id = this.current_application().id;
1384        this.execution_state_sender
1385            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1386                chain_id,
1387                stream_id,
1388                subscriber_app_id,
1389                callback,
1390            })?
1391            .recv_response()?;
1392        Ok(())
1393    }
1394
1395    fn unsubscribe_from_events(
1396        &mut self,
1397        chain_id: ChainId,
1398        application_id: ApplicationId,
1399        stream_name: StreamName,
1400    ) -> Result<(), ExecutionError> {
1401        let this = self.inner();
1402        ensure!(
1403            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1404            ExecutionError::StreamNameTooLong
1405        );
1406        let stream_id = StreamId {
1407            stream_name,
1408            application_id: application_id.into(),
1409        };
1410        let subscriber_app_id = this.current_application().id;
1411        this.execution_state_sender
1412            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1413                chain_id,
1414                stream_id,
1415                subscriber_app_id,
1416                callback,
1417            })?
1418            .recv_response()?;
1419        Ok(())
1420    }
1421
1422    fn query_service(
1423        &mut self,
1424        application_id: ApplicationId,
1425        query: Vec<u8>,
1426    ) -> Result<Vec<u8>, ExecutionError> {
1427        let mut this = self.inner();
1428
1429        let app_permissions = this
1430            .execution_state_sender
1431            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1432            .recv_response()?;
1433
1434        let app_id = this.current_application().id;
1435        ensure!(
1436            app_permissions.can_call_services(&app_id),
1437            ExecutionError::UnauthorizedApplication(app_id)
1438        );
1439
1440        this.resource_controller.track_service_oracle_call()?;
1441
1442        this.run_service_oracle_query(application_id, query)
1443    }
1444
1445    fn open_chain(
1446        &mut self,
1447        ownership: ChainOwnership,
1448        application_permissions: ApplicationPermissions,
1449        balance: Amount,
1450    ) -> Result<ChainId, ExecutionError> {
1451        let parent_id = self.inner().chain_id;
1452        let block_height = self.block_height()?;
1453
1454        let timestamp = self.inner().user_context;
1455
1456        let chain_id = self
1457            .inner()
1458            .execution_state_sender
1459            .send_request(|callback| ExecutionRequest::OpenChain {
1460                ownership,
1461                balance,
1462                parent_id,
1463                block_height,
1464                timestamp,
1465                application_permissions,
1466                callback,
1467            })?
1468            .recv_response()?;
1469
1470        Ok(chain_id)
1471    }
1472
1473    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1474        let this = self.inner();
1475        let application_id = this.current_application().id;
1476        this.execution_state_sender
1477            .send_request(|callback| ExecutionRequest::CloseChain {
1478                application_id,
1479                callback,
1480            })?
1481            .recv_response()?
1482    }
1483
1484    fn change_application_permissions(
1485        &mut self,
1486        application_permissions: ApplicationPermissions,
1487    ) -> Result<(), ExecutionError> {
1488        let this = self.inner();
1489        let application_id = this.current_application().id;
1490        this.execution_state_sender
1491            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1492                application_id,
1493                application_permissions,
1494                callback,
1495            })?
1496            .recv_response()?
1497    }
1498
1499    fn peek_application_index(&mut self) -> Result<u32, ExecutionError> {
1500        let index = self
1501            .inner()
1502            .execution_state_sender
1503            .send_request(move |callback| ExecutionRequest::PeekApplicationIndex { callback })?
1504            .recv_response()?;
1505        Ok(index)
1506    }
1507
1508    fn create_application(
1509        &mut self,
1510        module_id: ModuleId,
1511        parameters: Vec<u8>,
1512        argument: Vec<u8>,
1513        required_application_ids: Vec<ApplicationId>,
1514    ) -> Result<ApplicationId, ExecutionError> {
1515        let chain_id = self.inner().chain_id;
1516        let block_height = self.block_height()?;
1517
1518        let CreateApplicationResult { app_id } = self
1519            .inner()
1520            .execution_state_sender
1521            .send_request(move |callback| ExecutionRequest::CreateApplication {
1522                chain_id,
1523                block_height,
1524                module_id,
1525                parameters,
1526                required_application_ids,
1527                callback,
1528            })?
1529            .recv_response()?;
1530
1531        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1532
1533        contract
1534            .try_lock()
1535            .expect("Applications should not have reentrant calls")
1536            .instantiate(argument)?;
1537
1538        self.inner().finish_call();
1539
1540        Ok(app_id)
1541    }
1542
1543    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1544        let blob = Blob::new_data(bytes);
1545        let blob_id = blob.id();
1546        let this = self.inner();
1547        this.execution_state_sender
1548            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1549            .recv_response()?;
1550        Ok(DataBlobHash(blob_id.hash))
1551    }
1552
1553    fn publish_module(
1554        &mut self,
1555        contract: Bytecode,
1556        service: Bytecode,
1557        vm_runtime: VmRuntime,
1558    ) -> Result<ModuleId, ExecutionError> {
1559        let (blobs, module_id) =
1560            crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1561        let this = self.inner();
1562        for blob in blobs {
1563            this.execution_state_sender
1564                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1565                .recv_response()?;
1566        }
1567        Ok(module_id)
1568    }
1569
1570    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1571        let this = self.inner();
1572        let round = this.round;
1573        this.execution_state_sender
1574            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1575            .recv_response()
1576    }
1577
1578    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1579        let mut this = self.inner();
1580        let id = this.current_application().id;
1581        let state = this.view_user_states.entry(id).or_default();
1582        state.force_all_pending_queries()?;
1583        this.resource_controller.track_write_operations(
1584            batch
1585                .num_operations()
1586                .try_into()
1587                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1588        )?;
1589        this.resource_controller
1590            .track_bytes_written(batch.size() as u64)?;
1591        this.execution_state_sender
1592            .send_request(|callback| ExecutionRequest::WriteBatch {
1593                id,
1594                batch,
1595                callback,
1596            })?
1597            .recv_response()?;
1598        Ok(())
1599    }
1600}
1601
1602impl ServiceSyncRuntime {
1603    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1604    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1605        Self::new_with_deadline(execution_state_sender, context, None)
1606    }
1607
1608    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1609    pub fn new_with_deadline(
1610        execution_state_sender: ExecutionStateSender,
1611        context: QueryContext,
1612        deadline: Option<Instant>,
1613    ) -> Self {
1614        // Query the allow_application_logs setting from the execution state.
1615        let allow_application_logs = execution_state_sender
1616            .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1617            .ok()
1618            .and_then(|receiver| receiver.recv_response().ok())
1619            .unwrap_or(false);
1620
1621        let runtime = SyncRuntime(Some(
1622            SyncRuntimeInternal::new(
1623                context.chain_id,
1624                context.next_block_height,
1625                None,
1626                None,
1627                execution_state_sender,
1628                deadline,
1629                None,
1630                ResourceController::default(),
1631                (),
1632                allow_application_logs,
1633            )
1634            .into(),
1635        ));
1636
1637        ServiceSyncRuntime {
1638            runtime,
1639            current_context: context,
1640        }
1641    }
1642
1643    /// Preloads the code of a service into the runtime's memory.
1644    pub(crate) fn preload_service(
1645        &self,
1646        id: ApplicationId,
1647        code: UserServiceCode,
1648        description: ApplicationDescription,
1649    ) -> Result<(), ExecutionError> {
1650        let this = self
1651            .runtime
1652            .0
1653            .as_ref()
1654            .expect("services shouldn't be preloaded while the runtime is being dropped");
1655        let mut this_guard = this.inner();
1656
1657        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1658            entry.insert((code, description));
1659        }
1660
1661        Ok(())
1662    }
1663
1664    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1665    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1666        while let Ok(request) = incoming_requests.recv() {
1667            let ServiceRuntimeRequest::Query {
1668                application_id,
1669                context,
1670                query,
1671                callback,
1672            } = request;
1673
1674            let result = self
1675                .prepare_for_query(context)
1676                .and_then(|()| self.run_query(application_id, query));
1677
1678            if let Err(err) = callback.send(result) {
1679                tracing::debug!(%err, "Receiver for query result has been dropped");
1680            }
1681        }
1682    }
1683
1684    /// Prepares the runtime to query an application.
1685    pub(crate) fn prepare_for_query(
1686        &mut self,
1687        new_context: QueryContext,
1688    ) -> Result<(), ExecutionError> {
1689        let expected_context = QueryContext {
1690            local_time: new_context.local_time,
1691            ..self.current_context
1692        };
1693
1694        if new_context != expected_context {
1695            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1696            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1697        } else {
1698            self.handle_mut()
1699                .inner()
1700                .execution_state_sender
1701                .send_request(|callback| ExecutionRequest::SetLocalTime {
1702                    local_time: new_context.local_time,
1703                    callback,
1704                })?
1705                .recv_response()?;
1706        }
1707        Ok(())
1708    }
1709
1710    /// Queries an application specified by its [`ApplicationId`].
1711    pub(crate) fn run_query(
1712        &mut self,
1713        application_id: ApplicationId,
1714        query: Vec<u8>,
1715    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1716        let this = self.handle_mut();
1717        let response = this.try_query_application(application_id, query)?;
1718        let operations = mem::take(&mut this.inner().scheduled_operations);
1719
1720        Ok(QueryOutcome {
1721            response,
1722            operations,
1723        })
1724    }
1725
1726    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1727    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1728        self.runtime.0.as_mut().expect(
1729            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1730        )
1731    }
1732}
1733
1734impl ServiceRuntime for ServiceSyncRuntimeHandle {
1735    /// Note that queries are not available from writable contexts.
1736    fn try_query_application(
1737        &mut self,
1738        queried_id: ApplicationId,
1739        argument: Vec<u8>,
1740    ) -> Result<Vec<u8>, ExecutionError> {
1741        let service = {
1742            let mut this = self.inner();
1743
1744            // Load the application.
1745            let application = this.load_service_instance(self.clone(), queried_id)?;
1746            // Make the call to user code.
1747            this.push_application(ApplicationStatus {
1748                caller_id: None,
1749                id: queried_id,
1750                description: application.description,
1751                signer: None,
1752            });
1753            application.instance
1754        };
1755        let response = service
1756            .try_lock()
1757            .expect("Applications should not have reentrant calls")
1758            .handle_query(argument)?;
1759        self.inner().pop_application();
1760        Ok(response)
1761    }
1762
1763    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1764        let mut this = self.inner();
1765        let application_id = this.current_application().id;
1766
1767        this.scheduled_operations.push(Operation::User {
1768            application_id,
1769            bytes: operation,
1770        });
1771
1772        Ok(())
1773    }
1774
1775    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1776        if let Some(deadline) = self.inner().deadline {
1777            if Instant::now() >= deadline {
1778                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1779            }
1780        }
1781        Ok(())
1782    }
1783}
1784
1785/// A request to the service runtime actor.
1786pub enum ServiceRuntimeRequest {
1787    Query {
1788        application_id: ApplicationId,
1789        context: QueryContext,
1790        query: Vec<u8>,
1791        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1792    },
1793}
1794
1795/// The origin of the execution.
1796#[derive(Clone, Copy, Debug)]
1797struct ExecutingMessage {
1798    is_bouncing: bool,
1799    origin: ChainId,
1800}
1801
1802impl From<&MessageContext> for ExecutingMessage {
1803    fn from(context: &MessageContext) -> Self {
1804        ExecutingMessage {
1805            is_bouncing: context.is_bouncing,
1806            origin: context.origin,
1807        }
1808    }
1809}
1810
1811/// Creates a compressed contract and service bytecode synchronously.
1812pub fn create_bytecode_blobs_sync(
1813    contract: Bytecode,
1814    service: Bytecode,
1815    vm_runtime: VmRuntime,
1816) -> (Vec<Blob>, ModuleId) {
1817    match vm_runtime {
1818        VmRuntime::Wasm => {
1819            let compressed_contract = contract.compress();
1820            let compressed_service = service.compress();
1821            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1822            let service_blob = Blob::new_service_bytecode(compressed_service);
1823            let module_id =
1824                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1825            (vec![contract_blob, service_blob], module_id)
1826        }
1827        VmRuntime::Evm => {
1828            let compressed_contract = contract.compress();
1829            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1830            let module_id = ModuleId::new(
1831                evm_contract_blob.id().hash,
1832                evm_contract_blob.id().hash,
1833                vm_runtime,
1834            );
1835            (vec![evm_contract_blob], module_id)
1836        }
1837    }
1838}