linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29    execution::UserAction,
30    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31    resources::ResourceController,
32    system::CreateApplicationResult,
33    util::{ReceiverExt, UnboundedSenderExt},
34    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45    type UserContext;
46    type Code;
47}
48
49impl WithContext for UserContractInstance {
50    type UserContext = Timestamp;
51    type Code = UserContractCode;
52}
53
54impl WithContext for UserServiceInstance {
55    type UserContext = ();
56    type Code = UserServiceCode;
57}
58
59#[cfg(test)]
60impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
61    type UserContext = ();
62    type Code = ();
63}
64
65#[derive(Debug)]
66pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
67
68pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
69
70pub struct ServiceSyncRuntime {
71    runtime: SyncRuntime<UserServiceInstance>,
72    current_context: QueryContext,
73}
74
75#[derive(Debug)]
76pub struct SyncRuntimeHandle<UserInstance: WithContext>(
77    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
78);
79
80pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
81pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
82
83/// Runtime data tracked during the execution of a transaction on the synchronous thread.
84#[derive(Debug)]
85pub struct SyncRuntimeInternal<UserInstance: WithContext> {
86    /// The current chain ID.
87    chain_id: ChainId,
88    /// The height of the next block that will be added to this chain. During operations
89    /// and messages, this is the current block height.
90    height: BlockHeight,
91    /// The current consensus round. Only available during block validation in multi-leader rounds.
92    round: Option<u32>,
93    /// The current message being executed, if there is one.
94    #[debug(skip_if = Option::is_none)]
95    executing_message: Option<ExecutingMessage>,
96
97    /// How to interact with the storage view of the execution state.
98    execution_state_sender: ExecutionStateSender,
99
100    /// If applications are being finalized.
101    ///
102    /// If [`true`], disables cross-application calls.
103    is_finalizing: bool,
104    /// Applications that need to be finalized.
105    applications_to_finalize: Vec<ApplicationId>,
106
107    /// Application instances loaded in this transaction.
108    preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
109    /// Application instances loaded in this transaction.
110    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
111    /// The current stack of application descriptions.
112    call_stack: Vec<ApplicationStatus>,
113    /// The set of the IDs of the applications that are in the `call_stack`.
114    active_applications: HashSet<ApplicationId>,
115    /// The operations scheduled during this query.
116    scheduled_operations: Vec<Operation>,
117
118    /// Track application states based on views.
119    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
120
121    /// The deadline this runtime should finish executing.
122    ///
123    /// Used to limit the execution time of services running as oracles.
124    deadline: Option<Instant>,
125
126    /// Where to send a refund for the unused part of the grant after execution, if any.
127    #[debug(skip_if = Option::is_none)]
128    refund_grant_to: Option<Account>,
129    /// Controller to track fuel and storage consumption.
130    resource_controller: ResourceController,
131    /// Additional context for the runtime.
132    user_context: UserInstance::UserContext,
133    /// Whether contract log messages should be output.
134    allow_application_logs: bool,
135}
136
137/// The runtime status of an application.
138#[derive(Debug)]
139struct ApplicationStatus {
140    /// The caller application ID, if forwarded during the call.
141    caller_id: Option<ApplicationId>,
142    /// The application ID.
143    id: ApplicationId,
144    /// The application description.
145    description: ApplicationDescription,
146    /// The authenticated owner for the execution thread, if any.
147    signer: Option<AccountOwner>,
148}
149
150/// A loaded application instance.
151#[derive(Debug)]
152struct LoadedApplication<Instance> {
153    instance: Arc<Mutex<Instance>>,
154    description: ApplicationDescription,
155}
156
157impl<Instance> LoadedApplication<Instance> {
158    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
159    fn new(instance: Instance, description: ApplicationDescription) -> Self {
160        LoadedApplication {
161            instance: Arc::new(Mutex::new(instance)),
162            description,
163        }
164    }
165}
166
167impl<Instance> Clone for LoadedApplication<Instance> {
168    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
169    // bound
170    fn clone(&self) -> Self {
171        LoadedApplication {
172            instance: self.instance.clone(),
173            description: self.description.clone(),
174        }
175    }
176}
177
178#[derive(Debug)]
179enum Promise<T> {
180    Ready(T),
181    Pending(Receiver<T>),
182}
183
184impl<T> Promise<T> {
185    fn force(&mut self) -> Result<(), ExecutionError> {
186        if let Promise::Pending(receiver) = self {
187            let value = receiver
188                .recv_ref()
189                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
190            *self = Promise::Ready(value);
191        }
192        Ok(())
193    }
194
195    fn read(self) -> Result<T, ExecutionError> {
196        match self {
197            Promise::Pending(receiver) => {
198                let value = receiver.recv_response()?;
199                Ok(value)
200            }
201            Promise::Ready(value) => Ok(value),
202        }
203    }
204}
205
206/// Manages a set of pending queries returning values of type `T`.
207#[derive(Debug, Default)]
208struct QueryManager<T> {
209    /// The queries in progress.
210    pending_queries: BTreeMap<u32, Promise<T>>,
211    /// The number of queries ever registered so far. Used for the index of the next query.
212    query_count: u32,
213    /// The number of active queries.
214    active_query_count: u32,
215}
216
217impl<T> QueryManager<T> {
218    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
219        let id = self.query_count;
220        self.pending_queries.insert(id, Promise::Pending(receiver));
221        self.query_count = self
222            .query_count
223            .checked_add(1)
224            .ok_or(ArithmeticError::Overflow)?;
225        self.active_query_count = self
226            .active_query_count
227            .checked_add(1)
228            .ok_or(ArithmeticError::Overflow)?;
229        Ok(id)
230    }
231
232    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
233        let promise = self
234            .pending_queries
235            .remove(&id)
236            .ok_or(ExecutionError::InvalidPromise)?;
237        let value = promise.read()?;
238        self.active_query_count -= 1;
239        Ok(value)
240    }
241
242    fn force_all(&mut self) -> Result<(), ExecutionError> {
243        for promise in self.pending_queries.values_mut() {
244            promise.force()?;
245        }
246        Ok(())
247    }
248}
249
250type Keys = Vec<Vec<u8>>;
251type Value = Vec<u8>;
252type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
253
254#[derive(Debug, Default)]
255struct ViewUserState {
256    /// The contains-key queries in progress.
257    contains_key_queries: QueryManager<bool>,
258    /// The contains-keys queries in progress.
259    contains_keys_queries: QueryManager<Vec<bool>>,
260    /// The read-value queries in progress.
261    read_value_queries: QueryManager<Option<Value>>,
262    /// The read-multi-values queries in progress.
263    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
264    /// The find-keys queries in progress.
265    find_keys_queries: QueryManager<Keys>,
266    /// The find-key-values queries in progress.
267    find_key_values_queries: QueryManager<KeyValues>,
268}
269
270impl ViewUserState {
271    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
272        self.contains_key_queries.force_all()?;
273        self.contains_keys_queries.force_all()?;
274        self.read_value_queries.force_all()?;
275        self.read_multi_values_queries.force_all()?;
276        self.find_keys_queries.force_all()?;
277        self.find_key_values_queries.force_all()?;
278        Ok(())
279    }
280}
281
282impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
283    type Target = SyncRuntimeHandle<UserInstance>;
284
285    fn deref(&self) -> &Self::Target {
286        self.0.as_ref().expect(
287            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288        )
289    }
290}
291
292impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
293    fn deref_mut(&mut self) -> &mut Self::Target {
294        self.0.as_mut().expect(
295            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
296        )
297    }
298}
299
300impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
301    fn drop(&mut self) {
302        // Ensure the `loaded_applications` are cleared to prevent circular references in
303        // the runtime
304        if let Some(handle) = self.0.take() {
305            handle.inner().loaded_applications.clear();
306        }
307    }
308}
309
310impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
311    #[expect(clippy::too_many_arguments)]
312    fn new(
313        chain_id: ChainId,
314        height: BlockHeight,
315        round: Option<u32>,
316        executing_message: Option<ExecutingMessage>,
317        execution_state_sender: ExecutionStateSender,
318        deadline: Option<Instant>,
319        refund_grant_to: Option<Account>,
320        resource_controller: ResourceController,
321        user_context: UserInstance::UserContext,
322        allow_application_logs: bool,
323    ) -> Self {
324        Self {
325            chain_id,
326            height,
327            round,
328            executing_message,
329            execution_state_sender,
330            is_finalizing: false,
331            applications_to_finalize: Vec::new(),
332            preloaded_applications: HashMap::new(),
333            loaded_applications: HashMap::new(),
334            call_stack: Vec::new(),
335            active_applications: HashSet::new(),
336            view_user_states: BTreeMap::new(),
337            deadline,
338            refund_grant_to,
339            resource_controller,
340            scheduled_operations: Vec::new(),
341            user_context,
342            allow_application_logs,
343        }
344    }
345
346    /// Returns the [`ApplicationStatus`] of the current application.
347    ///
348    /// The current application is the last to be pushed to the `call_stack`.
349    ///
350    /// # Panics
351    ///
352    /// If the call stack is empty.
353    fn current_application(&self) -> &ApplicationStatus {
354        self.call_stack
355            .last()
356            .expect("Call stack is unexpectedly empty")
357    }
358
359    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
360    ///
361    /// Ensures the application's ID is also tracked in the `active_applications` set.
362    fn push_application(&mut self, status: ApplicationStatus) {
363        self.active_applications.insert(status.id);
364        self.call_stack.push(status);
365    }
366
367    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
368    ///
369    /// Ensures the application's ID is also removed from the `active_applications` set.
370    ///
371    /// # Panics
372    ///
373    /// If the call stack is empty.
374    fn pop_application(&mut self) -> ApplicationStatus {
375        let status = self
376            .call_stack
377            .pop()
378            .expect("Can't remove application from empty call stack");
379        assert!(self.active_applications.remove(&status.id));
380        status
381    }
382
383    /// Ensures that a call to `application_id` is not-reentrant.
384    ///
385    /// Returns an error if there already is an entry for `application_id` in the call stack.
386    fn check_for_reentrancy(
387        &mut self,
388        application_id: ApplicationId,
389    ) -> Result<(), ExecutionError> {
390        ensure!(
391            !self.active_applications.contains(&application_id),
392            ExecutionError::ReentrantCall(application_id)
393        );
394        Ok(())
395    }
396}
397
398impl SyncRuntimeInternal<UserContractInstance> {
399    /// Loads a contract instance, initializing it with this runtime if needed.
400    fn load_contract_instance(
401        &mut self,
402        this: SyncRuntimeHandle<UserContractInstance>,
403        id: ApplicationId,
404    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
405        match self.loaded_applications.entry(id) {
406            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
407
408            hash_map::Entry::Vacant(entry) => {
409                // First time actually using the application. Let's see if the code was
410                // pre-loaded.
411                let (code, description) = match self.preloaded_applications.entry(id) {
412                    // TODO(#2927): support dynamic loading of modules on the Web
413                    #[cfg(web)]
414                    hash_map::Entry::Vacant(_) => {
415                        drop(this);
416                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
417                            id,
418                        )));
419                    }
420                    #[cfg(not(web))]
421                    hash_map::Entry::Vacant(entry) => {
422                        let (code, description) = self
423                            .execution_state_sender
424                            .send_request(move |callback| ExecutionRequest::LoadContract {
425                                id,
426                                callback,
427                            })?
428                            .recv_response()?;
429                        entry.insert((code, description)).clone()
430                    }
431                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
432                };
433                let instance = code.instantiate(this)?;
434
435                self.applications_to_finalize.push(id);
436                Ok(entry
437                    .insert(LoadedApplication::new(instance, description))
438                    .clone())
439            }
440        }
441    }
442
443    /// Configures the runtime for executing a call to a different contract.
444    fn prepare_for_call(
445        &mut self,
446        this: ContractSyncRuntimeHandle,
447        authenticated: bool,
448        callee_id: ApplicationId,
449    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
450        self.check_for_reentrancy(callee_id)?;
451
452        ensure!(
453            !self.is_finalizing,
454            ExecutionError::CrossApplicationCallInFinalize {
455                caller_id: Box::new(self.current_application().id),
456                callee_id: Box::new(callee_id),
457            }
458        );
459
460        // Load the application.
461        let application = self.load_contract_instance(this, callee_id)?;
462
463        let caller = self.current_application();
464        let caller_id = caller.id;
465        let caller_signer = caller.signer;
466        // Make the call to user code.
467        let authenticated_owner = match caller_signer {
468            Some(signer) if authenticated => Some(signer),
469            _ => None,
470        };
471        let authenticated_caller_id = authenticated.then_some(caller_id);
472        self.push_application(ApplicationStatus {
473            caller_id: authenticated_caller_id,
474            id: callee_id,
475            description: application.description,
476            // Allow further nested calls to be authenticated if this one is.
477            signer: authenticated_owner,
478        });
479        Ok(application.instance)
480    }
481
482    /// Cleans up the runtime after the execution of a call to a different contract.
483    fn finish_call(&mut self) {
484        self.pop_application();
485    }
486
487    /// Runs the service in a separate thread as an oracle.
488    fn run_service_oracle_query(
489        &mut self,
490        application_id: ApplicationId,
491        query: Vec<u8>,
492    ) -> Result<Vec<u8>, ExecutionError> {
493        let timeout = self
494            .resource_controller
495            .remaining_service_oracle_execution_time()?;
496        let execution_start = Instant::now();
497        let deadline = Some(execution_start + timeout);
498        let response = self
499            .execution_state_sender
500            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
501                deadline,
502                application_id,
503                next_block_height: self.height,
504                query,
505                callback,
506            })?
507            .recv_response()?;
508
509        self.resource_controller
510            .track_service_oracle_execution(execution_start.elapsed())?;
511        self.resource_controller
512            .track_service_oracle_response(response.len())?;
513
514        Ok(response)
515    }
516}
517
518impl SyncRuntimeInternal<UserServiceInstance> {
519    /// Initializes a service instance with this runtime.
520    fn load_service_instance(
521        &mut self,
522        this: SyncRuntimeHandle<UserServiceInstance>,
523        id: ApplicationId,
524    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
525        match self.loaded_applications.entry(id) {
526            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
527
528            hash_map::Entry::Vacant(entry) => {
529                // First time actually using the application. Let's see if the code was
530                // pre-loaded.
531                let (code, description) = match self.preloaded_applications.entry(id) {
532                    // TODO(#2927): support dynamic loading of modules on the Web
533                    #[cfg(web)]
534                    hash_map::Entry::Vacant(_) => {
535                        drop(this);
536                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
537                            id,
538                        )));
539                    }
540                    #[cfg(not(web))]
541                    hash_map::Entry::Vacant(entry) => {
542                        let (code, description) = self
543                            .execution_state_sender
544                            .send_request(move |callback| ExecutionRequest::LoadService {
545                                id,
546                                callback,
547                            })?
548                            .recv_response()?;
549                        entry.insert((code, description)).clone()
550                    }
551                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
552                };
553                let instance = code.instantiate(this)?;
554
555                self.applications_to_finalize.push(id);
556                Ok(entry
557                    .insert(LoadedApplication::new(instance, description))
558                    .clone())
559            }
560        }
561    }
562}
563
564impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
565    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
566        let handle = self.0.take().expect(
567            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
568        );
569        let runtime = Arc::into_inner(handle.0)?
570            .into_inner()
571            .expect("`SyncRuntime` should run in a single thread which should not panic");
572        Some(runtime)
573    }
574}
575
576impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
577    for SyncRuntimeHandle<UserInstance>
578{
579    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
580        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
581    }
582}
583
584impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
585    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
586        self.0
587            .try_lock()
588            .expect("Synchronous runtimes run on a single execution thread")
589    }
590}
591
592impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
593where
594    Self: ContractOrServiceRuntime,
595{
596    type Read = ();
597    type ReadValueBytes = u32;
598    type ContainsKey = u32;
599    type ContainsKeys = u32;
600    type ReadMultiValuesBytes = u32;
601    type FindKeysByPrefix = u32;
602    type FindKeyValuesByPrefix = u32;
603
604    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
605        let mut this = self.inner();
606        let chain_id = this.chain_id;
607        this.resource_controller.track_runtime_chain_id()?;
608        Ok(chain_id)
609    }
610
611    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
612        let mut this = self.inner();
613        let height = this.height;
614        this.resource_controller.track_runtime_block_height()?;
615        Ok(height)
616    }
617
618    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
619        let mut this = self.inner();
620        let application_id = this.current_application().id;
621        this.resource_controller.track_runtime_application_id()?;
622        Ok(application_id)
623    }
624
625    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
626        let mut this = self.inner();
627        let application_creator_chain_id = this.current_application().description.creator_chain_id;
628        this.resource_controller.track_runtime_application_id()?;
629        Ok(application_creator_chain_id)
630    }
631
632    fn read_application_description(
633        &mut self,
634        application_id: ApplicationId,
635    ) -> Result<ApplicationDescription, ExecutionError> {
636        let mut this = self.inner();
637        let description = this
638            .execution_state_sender
639            .send_request(|callback| ExecutionRequest::ReadApplicationDescription {
640                application_id,
641                callback,
642            })?
643            .recv_response()?;
644        this.resource_controller
645            .track_runtime_application_description(&description)?;
646        Ok(description)
647    }
648
649    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
650        let mut this = self.inner();
651        let parameters = this.current_application().description.parameters.clone();
652        this.resource_controller
653            .track_runtime_application_parameters(&parameters)?;
654        Ok(parameters)
655    }
656
657    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
658        let mut this = self.inner();
659        let timestamp = this
660            .execution_state_sender
661            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
662            .recv_response()?;
663        this.resource_controller.track_runtime_timestamp()?;
664        Ok(timestamp)
665    }
666
667    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
668        let mut this = self.inner();
669        let balance = this
670            .execution_state_sender
671            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
672            .recv_response()?;
673        this.resource_controller.track_runtime_balance()?;
674        Ok(balance)
675    }
676
677    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
678        let mut this = self.inner();
679        let balance = this
680            .execution_state_sender
681            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
682            .recv_response()?;
683        this.resource_controller.track_runtime_balance()?;
684        Ok(balance)
685    }
686
687    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
688        let mut this = self.inner();
689        let owner_balances = this
690            .execution_state_sender
691            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
692            .recv_response()?;
693        this.resource_controller
694            .track_runtime_owner_balances(&owner_balances)?;
695        Ok(owner_balances)
696    }
697
698    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
699        let mut this = self.inner();
700        let owners = this
701            .execution_state_sender
702            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
703            .recv_response()?;
704        this.resource_controller.track_runtime_owners(&owners)?;
705        Ok(owners)
706    }
707
708    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
709        let mut this = self.inner();
710        let chain_ownership = this
711            .execution_state_sender
712            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
713            .recv_response()?;
714        this.resource_controller
715            .track_runtime_chain_ownership(&chain_ownership)?;
716        Ok(chain_ownership)
717    }
718
719    fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
720        let this = self.inner();
721        let application_permissions = this
722            .execution_state_sender
723            .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
724            .recv_response()?;
725        Ok(application_permissions)
726    }
727
728    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
729        let mut this = self.inner();
730        let id = this.current_application().id;
731        this.resource_controller.track_read_operation()?;
732        let receiver = this
733            .execution_state_sender
734            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
735        let state = this.view_user_states.entry(id).or_default();
736        state.contains_key_queries.register(receiver)
737    }
738
739    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
740        let mut this = self.inner();
741        let id = this.current_application().id;
742        let state = this.view_user_states.entry(id).or_default();
743        let value = state.contains_key_queries.wait(*promise)?;
744        Ok(value)
745    }
746
747    fn contains_keys_new(
748        &mut self,
749        keys: Vec<Vec<u8>>,
750    ) -> Result<Self::ContainsKeys, ExecutionError> {
751        let mut this = self.inner();
752        let id = this.current_application().id;
753        this.resource_controller.track_read_operation()?;
754        let receiver = this
755            .execution_state_sender
756            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
757        let state = this.view_user_states.entry(id).or_default();
758        state.contains_keys_queries.register(receiver)
759    }
760
761    fn contains_keys_wait(
762        &mut self,
763        promise: &Self::ContainsKeys,
764    ) -> Result<Vec<bool>, ExecutionError> {
765        let mut this = self.inner();
766        let id = this.current_application().id;
767        let state = this.view_user_states.entry(id).or_default();
768        let value = state.contains_keys_queries.wait(*promise)?;
769        Ok(value)
770    }
771
772    fn read_multi_values_bytes_new(
773        &mut self,
774        keys: Vec<Vec<u8>>,
775    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
776        let mut this = self.inner();
777        let id = this.current_application().id;
778        this.resource_controller.track_read_operation()?;
779        let receiver = this.execution_state_sender.send_request(move |callback| {
780            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
781        })?;
782        let state = this.view_user_states.entry(id).or_default();
783        state.read_multi_values_queries.register(receiver)
784    }
785
786    fn read_multi_values_bytes_wait(
787        &mut self,
788        promise: &Self::ReadMultiValuesBytes,
789    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
790        let mut this = self.inner();
791        let id = this.current_application().id;
792        let state = this.view_user_states.entry(id).or_default();
793        let values = state.read_multi_values_queries.wait(*promise)?;
794        for value in &values {
795            if let Some(value) = &value {
796                this.resource_controller
797                    .track_bytes_read(value.len() as u64)?;
798            }
799        }
800        Ok(values)
801    }
802
803    fn read_value_bytes_new(
804        &mut self,
805        key: Vec<u8>,
806    ) -> Result<Self::ReadValueBytes, ExecutionError> {
807        let mut this = self.inner();
808        let id = this.current_application().id;
809        this.resource_controller.track_read_operation()?;
810        let receiver = this
811            .execution_state_sender
812            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
813        let state = this.view_user_states.entry(id).or_default();
814        state.read_value_queries.register(receiver)
815    }
816
817    fn read_value_bytes_wait(
818        &mut self,
819        promise: &Self::ReadValueBytes,
820    ) -> Result<Option<Vec<u8>>, ExecutionError> {
821        let mut this = self.inner();
822        let id = this.current_application().id;
823        let value = {
824            let state = this.view_user_states.entry(id).or_default();
825            state.read_value_queries.wait(*promise)?
826        };
827        if let Some(value) = &value {
828            this.resource_controller
829                .track_bytes_read(value.len() as u64)?;
830        }
831        Ok(value)
832    }
833
834    fn find_keys_by_prefix_new(
835        &mut self,
836        key_prefix: Vec<u8>,
837    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
838        let mut this = self.inner();
839        let id = this.current_application().id;
840        this.resource_controller.track_read_operation()?;
841        let receiver = this.execution_state_sender.send_request(move |callback| {
842            ExecutionRequest::FindKeysByPrefix {
843                id,
844                key_prefix,
845                callback,
846            }
847        })?;
848        let state = this.view_user_states.entry(id).or_default();
849        state.find_keys_queries.register(receiver)
850    }
851
852    fn find_keys_by_prefix_wait(
853        &mut self,
854        promise: &Self::FindKeysByPrefix,
855    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
856        let mut this = self.inner();
857        let id = this.current_application().id;
858        let keys = {
859            let state = this.view_user_states.entry(id).or_default();
860            state.find_keys_queries.wait(*promise)?
861        };
862        let mut read_size = 0;
863        for key in &keys {
864            read_size += key.len();
865        }
866        this.resource_controller
867            .track_bytes_read(read_size as u64)?;
868        Ok(keys)
869    }
870
871    fn find_key_values_by_prefix_new(
872        &mut self,
873        key_prefix: Vec<u8>,
874    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
875        let mut this = self.inner();
876        let id = this.current_application().id;
877        this.resource_controller.track_read_operation()?;
878        let receiver = this.execution_state_sender.send_request(move |callback| {
879            ExecutionRequest::FindKeyValuesByPrefix {
880                id,
881                key_prefix,
882                callback,
883            }
884        })?;
885        let state = this.view_user_states.entry(id).or_default();
886        state.find_key_values_queries.register(receiver)
887    }
888
889    fn find_key_values_by_prefix_wait(
890        &mut self,
891        promise: &Self::FindKeyValuesByPrefix,
892    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
893        let mut this = self.inner();
894        let id = this.current_application().id;
895        let state = this.view_user_states.entry(id).or_default();
896        let key_values = state.find_key_values_queries.wait(*promise)?;
897        let mut read_size = 0;
898        for (key, value) in &key_values {
899            read_size += key.len() + value.len();
900        }
901        this.resource_controller
902            .track_bytes_read(read_size as u64)?;
903        Ok(key_values)
904    }
905
906    fn perform_http_request(
907        &mut self,
908        request: http::Request,
909    ) -> Result<http::Response, ExecutionError> {
910        let mut this = self.inner();
911        let app_permissions = this
912            .execution_state_sender
913            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
914            .recv_response()?;
915
916        let app_id = this.current_application().id;
917        ensure!(
918            app_permissions.can_make_http_requests(&app_id),
919            ExecutionError::UnauthorizedApplication(app_id)
920        );
921
922        this.resource_controller.track_http_request()?;
923
924        this.execution_state_sender
925            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
926                request,
927                http_responses_are_oracle_responses:
928                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
929                callback,
930            })?
931            .recv_response()
932    }
933
934    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
935        let this = self.inner();
936        this.execution_state_sender
937            .send_request(|callback| ExecutionRequest::AssertBefore {
938                timestamp,
939                callback,
940            })?
941            .recv_response()?
942    }
943
944    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
945        let this = self.inner();
946        let blob_id = hash.into();
947        let content = this
948            .execution_state_sender
949            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
950            .recv_response()?;
951        Ok(content.into_vec_or_clone())
952    }
953
954    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
955        let this = self.inner();
956        let blob_id = hash.into();
957        this.execution_state_sender
958            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
959            .recv_response()
960    }
961
962    fn has_empty_storage(&mut self, application: ApplicationId) -> Result<bool, ExecutionError> {
963        let this = self.inner();
964        let (key_size, value_size) = this
965            .execution_state_sender
966            .send_request(move |callback| ExecutionRequest::TotalStorageSize {
967                application,
968                callback,
969            })?
970            .recv_response()?;
971        Ok(key_size + value_size == 0)
972    }
973
974    fn maximum_blob_size(&mut self) -> Result<u64, ExecutionError> {
975        Ok(self.inner().resource_controller.policy().maximum_blob_size)
976    }
977
978    fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
979        Ok(self.inner().allow_application_logs)
980    }
981
982    #[cfg(web)]
983    fn send_log(&mut self, message: String, level: tracing::log::Level) {
984        let this = self.inner();
985        // Fire-and-forget: ignore errors since logging shouldn't affect execution.
986        this.execution_state_sender
987            .unbounded_send(ExecutionRequest::Log { message, level })
988            .ok();
989    }
990}
991
992/// An extension trait to determine in compile time the different behaviors between contract and
993/// services in the implementation of [`BaseRuntime`].
994trait ContractOrServiceRuntime {
995    /// Configured to `true` if the HTTP response size should be limited to the oracle response
996    /// size.
997    ///
998    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
999    /// and only storing in the block a shorter oracle response.
1000    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
1001}
1002
1003impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
1004    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
1005}
1006
1007impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
1008    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
1009}
1010
1011impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
1012    fn clone(&self) -> Self {
1013        SyncRuntimeHandle(self.0.clone())
1014    }
1015}
1016
1017impl ContractSyncRuntime {
1018    pub(crate) fn new(
1019        execution_state_sender: ExecutionStateSender,
1020        chain_id: ChainId,
1021        refund_grant_to: Option<Account>,
1022        resource_controller: ResourceController,
1023        action: &UserAction,
1024        allow_application_logs: bool,
1025    ) -> Self {
1026        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1027            SyncRuntimeInternal::new(
1028                chain_id,
1029                action.height(),
1030                action.round(),
1031                if let UserAction::Message(context, _) = action {
1032                    Some(context.into())
1033                } else {
1034                    None
1035                },
1036                execution_state_sender,
1037                None,
1038                refund_grant_to,
1039                resource_controller,
1040                action.timestamp(),
1041                allow_application_logs,
1042            ),
1043        )))
1044    }
1045
1046    /// Preloads the code of a contract into the runtime's memory.
1047    pub(crate) fn preload_contract(
1048        &self,
1049        id: ApplicationId,
1050        code: UserContractCode,
1051        description: ApplicationDescription,
1052    ) -> Result<(), ExecutionError> {
1053        let this = self
1054            .0
1055            .as_ref()
1056            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1057        let mut this_guard = this.inner();
1058
1059        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1060            entry.insert((code, description));
1061        }
1062
1063        Ok(())
1064    }
1065
1066    /// Main entry point to start executing a user action.
1067    pub(crate) fn run_action(
1068        mut self,
1069        application_id: ApplicationId,
1070        chain_id: ChainId,
1071        action: UserAction,
1072    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1073        let result = self
1074            .deref_mut()
1075            .run_action(application_id, chain_id, action)?;
1076        let runtime = self
1077            .into_inner()
1078            .expect("Runtime clones should have been freed by now");
1079
1080        Ok((result, runtime.resource_controller))
1081    }
1082}
1083
1084impl ContractSyncRuntimeHandle {
1085    fn run_action(
1086        &mut self,
1087        application_id: ApplicationId,
1088        chain_id: ChainId,
1089        action: UserAction,
1090    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1091        let finalize_context = FinalizeContext {
1092            authenticated_owner: action.signer(),
1093            chain_id,
1094            height: action.height(),
1095            round: action.round(),
1096        };
1097
1098        {
1099            let runtime = self.inner();
1100            assert_eq!(runtime.chain_id, chain_id);
1101            assert_eq!(runtime.height, action.height());
1102        }
1103
1104        let signer = action.signer();
1105        let closure = move |code: &mut UserContractInstance| match action {
1106            UserAction::Instantiate(_context, argument) => {
1107                code.instantiate(argument).map(|()| None)
1108            }
1109            UserAction::Operation(_context, operation) => {
1110                code.execute_operation(operation).map(Option::Some)
1111            }
1112            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1113            UserAction::ProcessStreams(_context, updates) => {
1114                code.process_streams(updates).map(|()| None)
1115            }
1116        };
1117
1118        let result = self.execute(application_id, signer, closure)?;
1119        self.finalize(finalize_context)?;
1120        Ok(result)
1121    }
1122
1123    /// Notifies all loaded applications that execution is finalizing.
1124    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1125        let applications = mem::take(&mut self.inner().applications_to_finalize)
1126            .into_iter()
1127            .rev();
1128
1129        self.inner().is_finalizing = true;
1130
1131        for application in applications {
1132            self.execute(application, context.authenticated_owner, |contract| {
1133                contract.finalize().map(|_| None)
1134            })?;
1135            self.inner().loaded_applications.remove(&application);
1136        }
1137
1138        Ok(())
1139    }
1140
1141    /// Executes a `closure` with the contract code for the `application_id`.
1142    fn execute(
1143        &mut self,
1144        application_id: ApplicationId,
1145        signer: Option<AccountOwner>,
1146        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1147    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1148        let contract = {
1149            let mut runtime = self.inner();
1150            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1151
1152            let status = ApplicationStatus {
1153                caller_id: None,
1154                id: application_id,
1155                description: application.description.clone(),
1156                signer,
1157            };
1158
1159            runtime.push_application(status);
1160
1161            application
1162        };
1163
1164        let result = closure(
1165            &mut contract
1166                .instance
1167                .try_lock()
1168                .expect("Application should not be already executing"),
1169        )?;
1170
1171        let mut runtime = self.inner();
1172        let application_status = runtime.pop_application();
1173        assert_eq!(application_status.caller_id, None);
1174        assert_eq!(application_status.id, application_id);
1175        assert_eq!(application_status.description, contract.description);
1176        assert_eq!(application_status.signer, signer);
1177        assert!(runtime.call_stack.is_empty());
1178
1179        Ok(result)
1180    }
1181}
1182
1183impl ContractRuntime for ContractSyncRuntimeHandle {
1184    fn authenticated_owner(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1185        let this = self.inner();
1186        Ok(this.current_application().signer)
1187    }
1188
1189    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1190        Ok(self
1191            .inner()
1192            .executing_message
1193            .map(|metadata| metadata.is_bouncing))
1194    }
1195
1196    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1197        Ok(self
1198            .inner()
1199            .executing_message
1200            .map(|metadata| metadata.origin))
1201    }
1202
1203    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1204        let this = self.inner();
1205        if this.call_stack.len() <= 1 {
1206            return Ok(None);
1207        }
1208        Ok(this.current_application().caller_id)
1209    }
1210
1211    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1212        Ok(match vm_runtime {
1213            VmRuntime::Wasm => {
1214                self.inner()
1215                    .resource_controller
1216                    .policy()
1217                    .maximum_wasm_fuel_per_block
1218            }
1219            VmRuntime::Evm => {
1220                self.inner()
1221                    .resource_controller
1222                    .policy()
1223                    .maximum_evm_fuel_per_block
1224            }
1225        })
1226    }
1227
1228    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1229        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1230    }
1231
1232    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1233        let mut this = self.inner();
1234        this.resource_controller.track_fuel(fuel, vm_runtime)
1235    }
1236
1237    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1238        let mut this = self.inner();
1239        let application = this.current_application();
1240        let application_id = application.id;
1241        let authenticated_owner = application.signer;
1242        let mut refund_grant_to = this.refund_grant_to;
1243
1244        let grant = this
1245            .resource_controller
1246            .policy()
1247            .total_price(&message.grant)?;
1248        if grant.is_zero() {
1249            refund_grant_to = None;
1250        } else {
1251            this.resource_controller.track_grant(grant)?;
1252        }
1253        let kind = if message.is_tracked {
1254            MessageKind::Tracked
1255        } else {
1256            MessageKind::Simple
1257        };
1258
1259        this.execution_state_sender
1260            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1261                message: OutgoingMessage {
1262                    destination: message.destination,
1263                    authenticated_owner,
1264                    refund_grant_to,
1265                    grant,
1266                    kind,
1267                    message: Message::User {
1268                        application_id,
1269                        bytes: message.message,
1270                    },
1271                },
1272                callback,
1273            })?
1274            .recv_response()?;
1275
1276        Ok(())
1277    }
1278
1279    fn transfer(
1280        &mut self,
1281        source: AccountOwner,
1282        destination: Account,
1283        amount: Amount,
1284    ) -> Result<(), ExecutionError> {
1285        let this = self.inner();
1286        let current_application = this.current_application();
1287        let application_id = current_application.id;
1288        let signer = current_application.signer;
1289
1290        this.execution_state_sender
1291            .send_request(|callback| ExecutionRequest::Transfer {
1292                source,
1293                destination,
1294                amount,
1295                signer,
1296                application_id,
1297                callback,
1298            })?
1299            .recv_response()?;
1300        Ok(())
1301    }
1302
1303    fn claim(
1304        &mut self,
1305        source: Account,
1306        destination: Account,
1307        amount: Amount,
1308    ) -> Result<(), ExecutionError> {
1309        let this = self.inner();
1310        let current_application = this.current_application();
1311        let application_id = current_application.id;
1312        let signer = current_application.signer;
1313
1314        this.execution_state_sender
1315            .send_request(|callback| ExecutionRequest::Claim {
1316                source,
1317                destination,
1318                amount,
1319                signer,
1320                application_id,
1321                callback,
1322            })?
1323            .recv_response()?;
1324        Ok(())
1325    }
1326
1327    fn try_call_application(
1328        &mut self,
1329        authenticated: bool,
1330        callee_id: ApplicationId,
1331        argument: Vec<u8>,
1332    ) -> Result<Vec<u8>, ExecutionError> {
1333        let contract = self
1334            .inner()
1335            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1336
1337        let value = contract
1338            .try_lock()
1339            .expect("Applications should not have reentrant calls")
1340            .execute_operation(argument)?;
1341
1342        self.inner().finish_call();
1343
1344        Ok(value)
1345    }
1346
1347    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1348        let mut this = self.inner();
1349        ensure!(
1350            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1351            ExecutionError::StreamNameTooLong
1352        );
1353        let application_id = GenericApplicationId::User(this.current_application().id);
1354        let stream_id = StreamId {
1355            stream_name,
1356            application_id,
1357        };
1358        let value_len = value.len() as u64;
1359        let index = this
1360            .execution_state_sender
1361            .send_request(|callback| ExecutionRequest::Emit {
1362                stream_id,
1363                value,
1364                callback,
1365            })?
1366            .recv_response()?;
1367        // TODO(#365): Consider separate event fee categories.
1368        this.resource_controller.track_bytes_written(value_len)?;
1369        Ok(index)
1370    }
1371
1372    fn read_event(
1373        &mut self,
1374        chain_id: ChainId,
1375        stream_name: StreamName,
1376        index: u32,
1377    ) -> Result<Vec<u8>, ExecutionError> {
1378        let mut this = self.inner();
1379        ensure!(
1380            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1381            ExecutionError::StreamNameTooLong
1382        );
1383        let application_id = GenericApplicationId::User(this.current_application().id);
1384        let stream_id = StreamId {
1385            stream_name,
1386            application_id,
1387        };
1388        let event_id = EventId {
1389            stream_id,
1390            index,
1391            chain_id,
1392        };
1393        let event = this
1394            .execution_state_sender
1395            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1396            .recv_response()?;
1397        // TODO(#365): Consider separate event fee categories.
1398        this.resource_controller
1399            .track_bytes_read(event.len() as u64)?;
1400        Ok(event)
1401    }
1402
1403    fn subscribe_to_events(
1404        &mut self,
1405        chain_id: ChainId,
1406        application_id: ApplicationId,
1407        stream_name: StreamName,
1408    ) -> Result<(), ExecutionError> {
1409        let this = self.inner();
1410        ensure!(
1411            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1412            ExecutionError::StreamNameTooLong
1413        );
1414        let stream_id = StreamId {
1415            stream_name,
1416            application_id: application_id.into(),
1417        };
1418        let subscriber_app_id = this.current_application().id;
1419        this.execution_state_sender
1420            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1421                chain_id,
1422                stream_id,
1423                subscriber_app_id,
1424                callback,
1425            })?
1426            .recv_response()?;
1427        Ok(())
1428    }
1429
1430    fn unsubscribe_from_events(
1431        &mut self,
1432        chain_id: ChainId,
1433        application_id: ApplicationId,
1434        stream_name: StreamName,
1435    ) -> Result<(), ExecutionError> {
1436        let this = self.inner();
1437        ensure!(
1438            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1439            ExecutionError::StreamNameTooLong
1440        );
1441        let stream_id = StreamId {
1442            stream_name,
1443            application_id: application_id.into(),
1444        };
1445        let subscriber_app_id = this.current_application().id;
1446        this.execution_state_sender
1447            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1448                chain_id,
1449                stream_id,
1450                subscriber_app_id,
1451                callback,
1452            })?
1453            .recv_response()?;
1454        Ok(())
1455    }
1456
1457    fn query_service(
1458        &mut self,
1459        application_id: ApplicationId,
1460        query: Vec<u8>,
1461    ) -> Result<Vec<u8>, ExecutionError> {
1462        let mut this = self.inner();
1463
1464        let app_permissions = this
1465            .execution_state_sender
1466            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1467            .recv_response()?;
1468
1469        let app_id = this.current_application().id;
1470        ensure!(
1471            app_permissions.can_call_services(&app_id),
1472            ExecutionError::UnauthorizedApplication(app_id)
1473        );
1474
1475        this.resource_controller.track_service_oracle_call()?;
1476
1477        this.run_service_oracle_query(application_id, query)
1478    }
1479
1480    fn open_chain(
1481        &mut self,
1482        ownership: ChainOwnership,
1483        application_permissions: ApplicationPermissions,
1484        balance: Amount,
1485    ) -> Result<ChainId, ExecutionError> {
1486        let parent_id = self.inner().chain_id;
1487        let block_height = self.block_height()?;
1488
1489        let timestamp = self.inner().user_context;
1490
1491        let chain_id = self
1492            .inner()
1493            .execution_state_sender
1494            .send_request(|callback| ExecutionRequest::OpenChain {
1495                ownership,
1496                balance,
1497                parent_id,
1498                block_height,
1499                timestamp,
1500                application_permissions,
1501                callback,
1502            })?
1503            .recv_response()?;
1504
1505        Ok(chain_id)
1506    }
1507
1508    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1509        let this = self.inner();
1510        let application_id = this.current_application().id;
1511        this.execution_state_sender
1512            .send_request(|callback| ExecutionRequest::CloseChain {
1513                application_id,
1514                callback,
1515            })?
1516            .recv_response()?
1517    }
1518
1519    fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1520        let this = self.inner();
1521        let application_id = this.current_application().id;
1522        this.execution_state_sender
1523            .send_request(|callback| ExecutionRequest::ChangeOwnership {
1524                application_id,
1525                ownership,
1526                callback,
1527            })?
1528            .recv_response()?
1529    }
1530
1531    fn change_application_permissions(
1532        &mut self,
1533        application_permissions: ApplicationPermissions,
1534    ) -> Result<(), ExecutionError> {
1535        let this = self.inner();
1536        let application_id = this.current_application().id;
1537        this.execution_state_sender
1538            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1539                application_id,
1540                application_permissions,
1541                callback,
1542            })?
1543            .recv_response()?
1544    }
1545
1546    fn peek_application_index(&mut self) -> Result<u32, ExecutionError> {
1547        let index = self
1548            .inner()
1549            .execution_state_sender
1550            .send_request(move |callback| ExecutionRequest::PeekApplicationIndex { callback })?
1551            .recv_response()?;
1552        Ok(index)
1553    }
1554
1555    fn create_application(
1556        &mut self,
1557        module_id: ModuleId,
1558        parameters: Vec<u8>,
1559        argument: Vec<u8>,
1560        required_application_ids: Vec<ApplicationId>,
1561    ) -> Result<ApplicationId, ExecutionError> {
1562        let chain_id = self.inner().chain_id;
1563        let block_height = self.block_height()?;
1564
1565        let CreateApplicationResult { app_id } = self
1566            .inner()
1567            .execution_state_sender
1568            .send_request(move |callback| ExecutionRequest::CreateApplication {
1569                chain_id,
1570                block_height,
1571                module_id,
1572                parameters,
1573                required_application_ids,
1574                callback,
1575            })?
1576            .recv_response()?;
1577
1578        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1579
1580        contract
1581            .try_lock()
1582            .expect("Applications should not have reentrant calls")
1583            .instantiate(argument)?;
1584
1585        self.inner().finish_call();
1586
1587        Ok(app_id)
1588    }
1589
1590    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1591        let blob = Blob::new_data(bytes);
1592        let blob_id = blob.id();
1593        let this = self.inner();
1594        this.execution_state_sender
1595            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1596            .recv_response()?;
1597        Ok(DataBlobHash(blob_id.hash))
1598    }
1599
1600    fn publish_module(
1601        &mut self,
1602        contract: Bytecode,
1603        service: Bytecode,
1604        vm_runtime: VmRuntime,
1605    ) -> Result<ModuleId, ExecutionError> {
1606        let (blobs, module_id) =
1607            crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1608        let this = self.inner();
1609        for blob in blobs {
1610            this.execution_state_sender
1611                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1612                .recv_response()?;
1613        }
1614        Ok(module_id)
1615    }
1616
1617    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1618        let this = self.inner();
1619        let round = this.round;
1620        this.execution_state_sender
1621            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1622            .recv_response()
1623    }
1624
1625    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1626        let mut this = self.inner();
1627        let id = this.current_application().id;
1628        let state = this.view_user_states.entry(id).or_default();
1629        state.force_all_pending_queries()?;
1630        this.resource_controller.track_write_operations(
1631            batch
1632                .num_operations()
1633                .try_into()
1634                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1635        )?;
1636        this.resource_controller
1637            .track_bytes_written(batch.size() as u64)?;
1638        this.execution_state_sender
1639            .send_request(|callback| ExecutionRequest::WriteBatch {
1640                id,
1641                batch,
1642                callback,
1643            })?
1644            .recv_response()?;
1645        Ok(())
1646    }
1647}
1648
1649impl ServiceSyncRuntime {
1650    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1651    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1652        Self::new_with_deadline(execution_state_sender, context, None)
1653    }
1654
1655    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1656    pub fn new_with_deadline(
1657        execution_state_sender: ExecutionStateSender,
1658        context: QueryContext,
1659        deadline: Option<Instant>,
1660    ) -> Self {
1661        // Query the allow_application_logs setting from the execution state.
1662        let allow_application_logs = execution_state_sender
1663            .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1664            .ok()
1665            .and_then(|receiver| receiver.recv_response().ok())
1666            .unwrap_or(false);
1667
1668        let runtime = SyncRuntime(Some(
1669            SyncRuntimeInternal::new(
1670                context.chain_id,
1671                context.next_block_height,
1672                None,
1673                None,
1674                execution_state_sender,
1675                deadline,
1676                None,
1677                ResourceController::default(),
1678                (),
1679                allow_application_logs,
1680            )
1681            .into(),
1682        ));
1683
1684        ServiceSyncRuntime {
1685            runtime,
1686            current_context: context,
1687        }
1688    }
1689
1690    /// Preloads the code of a service into the runtime's memory.
1691    pub(crate) fn preload_service(
1692        &self,
1693        id: ApplicationId,
1694        code: UserServiceCode,
1695        description: ApplicationDescription,
1696    ) -> Result<(), ExecutionError> {
1697        let this = self
1698            .runtime
1699            .0
1700            .as_ref()
1701            .expect("services shouldn't be preloaded while the runtime is being dropped");
1702        let mut this_guard = this.inner();
1703
1704        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1705            entry.insert((code, description));
1706        }
1707
1708        Ok(())
1709    }
1710
1711    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1712    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1713        while let Ok(request) = incoming_requests.recv() {
1714            let ServiceRuntimeRequest::Query {
1715                application_id,
1716                context,
1717                query,
1718                callback,
1719            } = request;
1720
1721            let result = self
1722                .prepare_for_query(context)
1723                .and_then(|()| self.run_query(application_id, query));
1724
1725            if let Err(err) = callback.send(result) {
1726                tracing::debug!(%err, "Receiver for query result has been dropped");
1727            }
1728        }
1729    }
1730
1731    /// Prepares the runtime to query an application.
1732    pub(crate) fn prepare_for_query(
1733        &mut self,
1734        new_context: QueryContext,
1735    ) -> Result<(), ExecutionError> {
1736        let expected_context = QueryContext {
1737            local_time: new_context.local_time,
1738            ..self.current_context
1739        };
1740
1741        if new_context != expected_context {
1742            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1743            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1744        } else {
1745            self.handle_mut()
1746                .inner()
1747                .execution_state_sender
1748                .send_request(|callback| ExecutionRequest::SetLocalTime {
1749                    local_time: new_context.local_time,
1750                    callback,
1751                })?
1752                .recv_response()?;
1753        }
1754        Ok(())
1755    }
1756
1757    /// Queries an application specified by its [`ApplicationId`].
1758    pub(crate) fn run_query(
1759        &mut self,
1760        application_id: ApplicationId,
1761        query: Vec<u8>,
1762    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1763        let this = self.handle_mut();
1764        let response = this.try_query_application(application_id, query)?;
1765        let operations = mem::take(&mut this.inner().scheduled_operations);
1766
1767        Ok(QueryOutcome {
1768            response,
1769            operations,
1770        })
1771    }
1772
1773    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1774    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1775        self.runtime.0.as_mut().expect(
1776            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1777        )
1778    }
1779}
1780
1781impl ServiceRuntime for ServiceSyncRuntimeHandle {
1782    /// Note that queries are not available from writable contexts.
1783    fn try_query_application(
1784        &mut self,
1785        queried_id: ApplicationId,
1786        argument: Vec<u8>,
1787    ) -> Result<Vec<u8>, ExecutionError> {
1788        let service = {
1789            let mut this = self.inner();
1790
1791            // Load the application.
1792            let application = this.load_service_instance(self.clone(), queried_id)?;
1793            // Make the call to user code.
1794            this.push_application(ApplicationStatus {
1795                caller_id: None,
1796                id: queried_id,
1797                description: application.description,
1798                signer: None,
1799            });
1800            application.instance
1801        };
1802        let response = service
1803            .try_lock()
1804            .expect("Applications should not have reentrant calls")
1805            .handle_query(argument)?;
1806        self.inner().pop_application();
1807        Ok(response)
1808    }
1809
1810    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1811        let mut this = self.inner();
1812        let application_id = this.current_application().id;
1813
1814        this.scheduled_operations.push(Operation::User {
1815            application_id,
1816            bytes: operation,
1817        });
1818
1819        Ok(())
1820    }
1821
1822    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1823        if let Some(deadline) = self.inner().deadline {
1824            if Instant::now() >= deadline {
1825                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1826            }
1827        }
1828        Ok(())
1829    }
1830}
1831
1832/// A request to the service runtime actor.
1833pub enum ServiceRuntimeRequest {
1834    Query {
1835        application_id: ApplicationId,
1836        context: QueryContext,
1837        query: Vec<u8>,
1838        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1839    },
1840}
1841
1842/// The origin of the execution.
1843#[derive(Clone, Copy, Debug)]
1844struct ExecutingMessage {
1845    is_bouncing: bool,
1846    origin: ChainId,
1847}
1848
1849impl From<&MessageContext> for ExecutingMessage {
1850    fn from(context: &MessageContext) -> Self {
1851        ExecutingMessage {
1852            is_bouncing: context.is_bouncing,
1853            origin: context.origin,
1854        }
1855    }
1856}
1857
1858/// Creates a compressed contract and service bytecode synchronously.
1859pub fn create_bytecode_blobs_sync(
1860    contract: Bytecode,
1861    service: Bytecode,
1862    vm_runtime: VmRuntime,
1863) -> (Vec<Blob>, ModuleId) {
1864    match vm_runtime {
1865        VmRuntime::Wasm => {
1866            let compressed_contract = contract.compress();
1867            let compressed_service = service.compress();
1868            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1869            let service_blob = Blob::new_service_bytecode(compressed_service);
1870            let module_id =
1871                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1872            (vec![contract_blob, service_blob], module_id)
1873        }
1874        VmRuntime::Evm => {
1875            let compressed_contract = contract.compress();
1876            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1877            let module_id = ModuleId::new(
1878                evm_contract_blob.id().hash,
1879                evm_contract_blob.id().hash,
1880                vm_runtime,
1881            );
1882            (vec![evm_contract_blob], module_id)
1883        }
1884    }
1885}