linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29    execution::UserAction,
30    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31    resources::ResourceController,
32    system::CreateApplicationResult,
33    util::{ReceiverExt, UnboundedSenderExt},
34    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45    type UserContext;
46}
47
48impl WithContext for UserContractInstance {
49    type UserContext = Timestamp;
50}
51
52impl WithContext for UserServiceInstance {
53    type UserContext = ();
54}
55
56#[cfg(test)]
57impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
58    type UserContext = ();
59}
60
61#[derive(Debug)]
62pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
63
64pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
65
66pub struct ServiceSyncRuntime {
67    runtime: SyncRuntime<UserServiceInstance>,
68    current_context: QueryContext,
69}
70
71#[derive(Debug)]
72pub struct SyncRuntimeHandle<UserInstance: WithContext>(
73    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
74);
75
76pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
77pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
78
79/// Runtime data tracked during the execution of a transaction on the synchronous thread.
80#[derive(Debug)]
81pub struct SyncRuntimeInternal<UserInstance: WithContext> {
82    /// The current chain ID.
83    chain_id: ChainId,
84    /// The height of the next block that will be added to this chain. During operations
85    /// and messages, this is the current block height.
86    height: BlockHeight,
87    /// The current consensus round. Only available during block validation in multi-leader rounds.
88    round: Option<u32>,
89    /// The current message being executed, if there is one.
90    #[debug(skip_if = Option::is_none)]
91    executing_message: Option<ExecutingMessage>,
92
93    /// How to interact with the storage view of the execution state.
94    execution_state_sender: ExecutionStateSender,
95
96    /// If applications are being finalized.
97    ///
98    /// If [`true`], disables cross-application calls.
99    is_finalizing: bool,
100    /// Applications that need to be finalized.
101    applications_to_finalize: Vec<ApplicationId>,
102
103    /// Application instances loaded in this transaction.
104    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
105    /// The current stack of application descriptions.
106    call_stack: Vec<ApplicationStatus>,
107    /// The set of the IDs of the applications that are in the `call_stack`.
108    active_applications: HashSet<ApplicationId>,
109    /// The operations scheduled during this query.
110    scheduled_operations: Vec<Operation>,
111
112    /// Track application states based on views.
113    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
114
115    /// The deadline this runtime should finish executing.
116    ///
117    /// Used to limit the execution time of services running as oracles.
118    deadline: Option<Instant>,
119
120    /// Where to send a refund for the unused part of the grant after execution, if any.
121    #[debug(skip_if = Option::is_none)]
122    refund_grant_to: Option<Account>,
123    /// Controller to track fuel and storage consumption.
124    resource_controller: ResourceController,
125    /// Additional context for the runtime.
126    user_context: UserInstance::UserContext,
127}
128
129/// The runtime status of an application.
130#[derive(Debug)]
131struct ApplicationStatus {
132    /// The caller application ID, if forwarded during the call.
133    caller_id: Option<ApplicationId>,
134    /// The application ID.
135    id: ApplicationId,
136    /// The application description.
137    description: ApplicationDescription,
138    /// The authenticated signer for the execution thread, if any.
139    signer: Option<AccountOwner>,
140}
141
142/// A loaded application instance.
143#[derive(Debug)]
144struct LoadedApplication<Instance> {
145    instance: Arc<Mutex<Instance>>,
146    description: ApplicationDescription,
147}
148
149impl<Instance> LoadedApplication<Instance> {
150    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
151    fn new(instance: Instance, description: ApplicationDescription) -> Self {
152        LoadedApplication {
153            instance: Arc::new(Mutex::new(instance)),
154            description,
155        }
156    }
157}
158
159impl<Instance> Clone for LoadedApplication<Instance> {
160    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
161    // bound
162    fn clone(&self) -> Self {
163        LoadedApplication {
164            instance: self.instance.clone(),
165            description: self.description.clone(),
166        }
167    }
168}
169
170#[derive(Debug)]
171enum Promise<T> {
172    Ready(T),
173    Pending(Receiver<T>),
174}
175
176impl<T> Promise<T> {
177    fn force(&mut self) -> Result<(), ExecutionError> {
178        if let Promise::Pending(receiver) = self {
179            let value = receiver
180                .recv_ref()
181                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
182            *self = Promise::Ready(value);
183        }
184        Ok(())
185    }
186
187    fn read(self) -> Result<T, ExecutionError> {
188        match self {
189            Promise::Pending(receiver) => {
190                let value = receiver.recv_response()?;
191                Ok(value)
192            }
193            Promise::Ready(value) => Ok(value),
194        }
195    }
196}
197
198/// Manages a set of pending queries returning values of type `T`.
199#[derive(Debug, Default)]
200struct QueryManager<T> {
201    /// The queries in progress.
202    pending_queries: BTreeMap<u32, Promise<T>>,
203    /// The number of queries ever registered so far. Used for the index of the next query.
204    query_count: u32,
205    /// The number of active queries.
206    active_query_count: u32,
207}
208
209impl<T> QueryManager<T> {
210    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
211        let id = self.query_count;
212        self.pending_queries.insert(id, Promise::Pending(receiver));
213        self.query_count = self
214            .query_count
215            .checked_add(1)
216            .ok_or(ArithmeticError::Overflow)?;
217        self.active_query_count = self
218            .active_query_count
219            .checked_add(1)
220            .ok_or(ArithmeticError::Overflow)?;
221        Ok(id)
222    }
223
224    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
225        let promise = self
226            .pending_queries
227            .remove(&id)
228            .ok_or(ExecutionError::InvalidPromise)?;
229        let value = promise.read()?;
230        self.active_query_count -= 1;
231        Ok(value)
232    }
233
234    fn force_all(&mut self) -> Result<(), ExecutionError> {
235        for promise in self.pending_queries.values_mut() {
236            promise.force()?;
237        }
238        Ok(())
239    }
240}
241
242type Keys = Vec<Vec<u8>>;
243type Value = Vec<u8>;
244type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
245
246#[derive(Debug, Default)]
247struct ViewUserState {
248    /// The contains-key queries in progress.
249    contains_key_queries: QueryManager<bool>,
250    /// The contains-keys queries in progress.
251    contains_keys_queries: QueryManager<Vec<bool>>,
252    /// The read-value queries in progress.
253    read_value_queries: QueryManager<Option<Value>>,
254    /// The read-multi-values queries in progress.
255    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
256    /// The find-keys queries in progress.
257    find_keys_queries: QueryManager<Keys>,
258    /// The find-key-values queries in progress.
259    find_key_values_queries: QueryManager<KeyValues>,
260}
261
262impl ViewUserState {
263    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
264        self.contains_key_queries.force_all()?;
265        self.contains_keys_queries.force_all()?;
266        self.read_value_queries.force_all()?;
267        self.read_multi_values_queries.force_all()?;
268        self.find_keys_queries.force_all()?;
269        self.find_key_values_queries.force_all()?;
270        Ok(())
271    }
272}
273
274impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
275    type Target = SyncRuntimeHandle<UserInstance>;
276
277    fn deref(&self) -> &Self::Target {
278        self.0.as_ref().expect(
279            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
280        )
281    }
282}
283
284impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
285    fn deref_mut(&mut self) -> &mut Self::Target {
286        self.0.as_mut().expect(
287            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288        )
289    }
290}
291
292impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
293    fn drop(&mut self) {
294        // Ensure the `loaded_applications` are cleared to prevent circular references in
295        // the runtime
296        if let Some(handle) = self.0.take() {
297            handle.inner().loaded_applications.clear();
298        }
299    }
300}
301
302impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
303    #[expect(clippy::too_many_arguments)]
304    fn new(
305        chain_id: ChainId,
306        height: BlockHeight,
307        round: Option<u32>,
308        executing_message: Option<ExecutingMessage>,
309        execution_state_sender: ExecutionStateSender,
310        deadline: Option<Instant>,
311        refund_grant_to: Option<Account>,
312        resource_controller: ResourceController,
313        user_context: UserInstance::UserContext,
314    ) -> Self {
315        Self {
316            chain_id,
317            height,
318            round,
319            executing_message,
320            execution_state_sender,
321            is_finalizing: false,
322            applications_to_finalize: Vec::new(),
323            loaded_applications: HashMap::new(),
324            call_stack: Vec::new(),
325            active_applications: HashSet::new(),
326            view_user_states: BTreeMap::new(),
327            deadline,
328            refund_grant_to,
329            resource_controller,
330            scheduled_operations: Vec::new(),
331            user_context,
332        }
333    }
334
335    /// Returns the [`ApplicationStatus`] of the current application.
336    ///
337    /// The current application is the last to be pushed to the `call_stack`.
338    ///
339    /// # Panics
340    ///
341    /// If the call stack is empty.
342    fn current_application(&self) -> &ApplicationStatus {
343        self.call_stack
344            .last()
345            .expect("Call stack is unexpectedly empty")
346    }
347
348    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
349    ///
350    /// Ensures the application's ID is also tracked in the `active_applications` set.
351    fn push_application(&mut self, status: ApplicationStatus) {
352        self.active_applications.insert(status.id);
353        self.call_stack.push(status);
354    }
355
356    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
357    ///
358    /// Ensures the application's ID is also removed from the `active_applications` set.
359    ///
360    /// # Panics
361    ///
362    /// If the call stack is empty.
363    fn pop_application(&mut self) -> ApplicationStatus {
364        let status = self
365            .call_stack
366            .pop()
367            .expect("Can't remove application from empty call stack");
368        assert!(self.active_applications.remove(&status.id));
369        status
370    }
371
372    /// Ensures that a call to `application_id` is not-reentrant.
373    ///
374    /// Returns an error if there already is an entry for `application_id` in the call stack.
375    fn check_for_reentrancy(
376        &mut self,
377        application_id: ApplicationId,
378    ) -> Result<(), ExecutionError> {
379        ensure!(
380            !self.active_applications.contains(&application_id),
381            ExecutionError::ReentrantCall(application_id)
382        );
383        Ok(())
384    }
385}
386
387impl SyncRuntimeInternal<UserContractInstance> {
388    /// Loads a contract instance, initializing it with this runtime if needed.
389    fn load_contract_instance(
390        &mut self,
391        this: SyncRuntimeHandle<UserContractInstance>,
392        id: ApplicationId,
393    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
394        match self.loaded_applications.entry(id) {
395            // TODO(#2927): support dynamic loading of modules on the Web
396            #[cfg(web)]
397            hash_map::Entry::Vacant(_) => {
398                drop(this);
399                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
400                    id,
401                )))
402            }
403            #[cfg(not(web))]
404            hash_map::Entry::Vacant(entry) => {
405                let (code, description) = self
406                    .execution_state_sender
407                    .send_request(move |callback| ExecutionRequest::LoadContract { id, callback })?
408                    .recv_response()?;
409
410                let instance = code.instantiate(this)?;
411
412                self.applications_to_finalize.push(id);
413                Ok(entry
414                    .insert(LoadedApplication::new(instance, description))
415                    .clone())
416            }
417            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
418        }
419    }
420
421    /// Configures the runtime for executing a call to a different contract.
422    fn prepare_for_call(
423        &mut self,
424        this: ContractSyncRuntimeHandle,
425        authenticated: bool,
426        callee_id: ApplicationId,
427    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
428        self.check_for_reentrancy(callee_id)?;
429
430        ensure!(
431            !self.is_finalizing,
432            ExecutionError::CrossApplicationCallInFinalize {
433                caller_id: Box::new(self.current_application().id),
434                callee_id: Box::new(callee_id),
435            }
436        );
437
438        // Load the application.
439        let application = self.load_contract_instance(this, callee_id)?;
440
441        let caller = self.current_application();
442        let caller_id = caller.id;
443        let caller_signer = caller.signer;
444        // Make the call to user code.
445        let authenticated_signer = match caller_signer {
446            Some(signer) if authenticated => Some(signer),
447            _ => None,
448        };
449        let authenticated_caller_id = authenticated.then_some(caller_id);
450        self.push_application(ApplicationStatus {
451            caller_id: authenticated_caller_id,
452            id: callee_id,
453            description: application.description,
454            // Allow further nested calls to be authenticated if this one is.
455            signer: authenticated_signer,
456        });
457        Ok(application.instance)
458    }
459
460    /// Cleans up the runtime after the execution of a call to a different contract.
461    fn finish_call(&mut self) {
462        self.pop_application();
463    }
464
465    /// Runs the service in a separate thread as an oracle.
466    fn run_service_oracle_query(
467        &mut self,
468        application_id: ApplicationId,
469        query: Vec<u8>,
470    ) -> Result<Vec<u8>, ExecutionError> {
471        let timeout = self
472            .resource_controller
473            .remaining_service_oracle_execution_time()?;
474        let execution_start = Instant::now();
475        let deadline = Some(execution_start + timeout);
476        let response = self
477            .execution_state_sender
478            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
479                deadline,
480                application_id,
481                next_block_height: self.height,
482                query,
483                callback,
484            })?
485            .recv_response()?;
486
487        self.resource_controller
488            .track_service_oracle_execution(execution_start.elapsed())?;
489        self.resource_controller
490            .track_service_oracle_response(response.len())?;
491
492        Ok(response)
493    }
494}
495
496impl SyncRuntimeInternal<UserServiceInstance> {
497    /// Initializes a service instance with this runtime.
498    fn load_service_instance(
499        &mut self,
500        this: ServiceSyncRuntimeHandle,
501        id: ApplicationId,
502    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
503        match self.loaded_applications.entry(id) {
504            // TODO(#2927): support dynamic loading of modules on the Web
505            #[cfg(web)]
506            hash_map::Entry::Vacant(_) => {
507                drop(this);
508                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
509                    id,
510                )))
511            }
512            #[cfg(not(web))]
513            hash_map::Entry::Vacant(entry) => {
514                let (code, description) = self
515                    .execution_state_sender
516                    .send_request(move |callback| ExecutionRequest::LoadService { id, callback })?
517                    .recv_response()?;
518
519                let instance = code.instantiate(this)?;
520                Ok(entry
521                    .insert(LoadedApplication::new(instance, description))
522                    .clone())
523            }
524            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
525        }
526    }
527}
528
529impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
530    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
531        let handle = self.0.take().expect(
532            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
533        );
534        let runtime = Arc::into_inner(handle.0)?
535            .into_inner()
536            .expect("`SyncRuntime` should run in a single thread which should not panic");
537        Some(runtime)
538    }
539}
540
541impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
542    for SyncRuntimeHandle<UserInstance>
543{
544    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
545        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
546    }
547}
548
549impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
550    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
551        self.0
552            .try_lock()
553            .expect("Synchronous runtimes run on a single execution thread")
554    }
555}
556
557impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
558where
559    Self: ContractOrServiceRuntime,
560{
561    type Read = ();
562    type ReadValueBytes = u32;
563    type ContainsKey = u32;
564    type ContainsKeys = u32;
565    type ReadMultiValuesBytes = u32;
566    type FindKeysByPrefix = u32;
567    type FindKeyValuesByPrefix = u32;
568
569    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
570        let mut this = self.inner();
571        let chain_id = this.chain_id;
572        this.resource_controller.track_runtime_chain_id()?;
573        Ok(chain_id)
574    }
575
576    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
577        let mut this = self.inner();
578        let height = this.height;
579        this.resource_controller.track_runtime_block_height()?;
580        Ok(height)
581    }
582
583    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
584        let mut this = self.inner();
585        let application_id = this.current_application().id;
586        this.resource_controller.track_runtime_application_id()?;
587        Ok(application_id)
588    }
589
590    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
591        let mut this = self.inner();
592        let application_creator_chain_id = this.current_application().description.creator_chain_id;
593        this.resource_controller.track_runtime_application_id()?;
594        Ok(application_creator_chain_id)
595    }
596
597    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
598        let mut this = self.inner();
599        let parameters = this.current_application().description.parameters.clone();
600        this.resource_controller
601            .track_runtime_application_parameters(&parameters)?;
602        Ok(parameters)
603    }
604
605    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
606        let mut this = self.inner();
607        let timestamp = this
608            .execution_state_sender
609            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
610            .recv_response()?;
611        this.resource_controller.track_runtime_timestamp()?;
612        Ok(timestamp)
613    }
614
615    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
616        let mut this = self.inner();
617        let balance = this
618            .execution_state_sender
619            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
620            .recv_response()?;
621        this.resource_controller.track_runtime_balance()?;
622        Ok(balance)
623    }
624
625    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
626        let mut this = self.inner();
627        let balance = this
628            .execution_state_sender
629            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
630            .recv_response()?;
631        this.resource_controller.track_runtime_balance()?;
632        Ok(balance)
633    }
634
635    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
636        let mut this = self.inner();
637        let owner_balances = this
638            .execution_state_sender
639            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
640            .recv_response()?;
641        this.resource_controller
642            .track_runtime_owner_balances(&owner_balances)?;
643        Ok(owner_balances)
644    }
645
646    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
647        let mut this = self.inner();
648        let owners = this
649            .execution_state_sender
650            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
651            .recv_response()?;
652        this.resource_controller.track_runtime_owners(&owners)?;
653        Ok(owners)
654    }
655
656    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
657        let mut this = self.inner();
658        let chain_ownership = this
659            .execution_state_sender
660            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
661            .recv_response()?;
662        this.resource_controller
663            .track_runtime_chain_ownership(&chain_ownership)?;
664        Ok(chain_ownership)
665    }
666
667    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
668        let mut this = self.inner();
669        let id = this.current_application().id;
670        this.resource_controller.track_read_operation()?;
671        let receiver = this
672            .execution_state_sender
673            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
674        let state = this.view_user_states.entry(id).or_default();
675        state.contains_key_queries.register(receiver)
676    }
677
678    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
679        let mut this = self.inner();
680        let id = this.current_application().id;
681        let state = this.view_user_states.entry(id).or_default();
682        let value = state.contains_key_queries.wait(*promise)?;
683        Ok(value)
684    }
685
686    fn contains_keys_new(
687        &mut self,
688        keys: Vec<Vec<u8>>,
689    ) -> Result<Self::ContainsKeys, ExecutionError> {
690        let mut this = self.inner();
691        let id = this.current_application().id;
692        this.resource_controller.track_read_operation()?;
693        let receiver = this
694            .execution_state_sender
695            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
696        let state = this.view_user_states.entry(id).or_default();
697        state.contains_keys_queries.register(receiver)
698    }
699
700    fn contains_keys_wait(
701        &mut self,
702        promise: &Self::ContainsKeys,
703    ) -> Result<Vec<bool>, ExecutionError> {
704        let mut this = self.inner();
705        let id = this.current_application().id;
706        let state = this.view_user_states.entry(id).or_default();
707        let value = state.contains_keys_queries.wait(*promise)?;
708        Ok(value)
709    }
710
711    fn read_multi_values_bytes_new(
712        &mut self,
713        keys: Vec<Vec<u8>>,
714    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
715        let mut this = self.inner();
716        let id = this.current_application().id;
717        this.resource_controller.track_read_operation()?;
718        let receiver = this.execution_state_sender.send_request(move |callback| {
719            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
720        })?;
721        let state = this.view_user_states.entry(id).or_default();
722        state.read_multi_values_queries.register(receiver)
723    }
724
725    fn read_multi_values_bytes_wait(
726        &mut self,
727        promise: &Self::ReadMultiValuesBytes,
728    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
729        let mut this = self.inner();
730        let id = this.current_application().id;
731        let state = this.view_user_states.entry(id).or_default();
732        let values = state.read_multi_values_queries.wait(*promise)?;
733        for value in &values {
734            if let Some(value) = &value {
735                this.resource_controller
736                    .track_bytes_read(value.len() as u64)?;
737            }
738        }
739        Ok(values)
740    }
741
742    fn read_value_bytes_new(
743        &mut self,
744        key: Vec<u8>,
745    ) -> Result<Self::ReadValueBytes, ExecutionError> {
746        let mut this = self.inner();
747        let id = this.current_application().id;
748        this.resource_controller.track_read_operation()?;
749        let receiver = this
750            .execution_state_sender
751            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
752        let state = this.view_user_states.entry(id).or_default();
753        state.read_value_queries.register(receiver)
754    }
755
756    fn read_value_bytes_wait(
757        &mut self,
758        promise: &Self::ReadValueBytes,
759    ) -> Result<Option<Vec<u8>>, ExecutionError> {
760        let mut this = self.inner();
761        let id = this.current_application().id;
762        let value = {
763            let state = this.view_user_states.entry(id).or_default();
764            state.read_value_queries.wait(*promise)?
765        };
766        if let Some(value) = &value {
767            this.resource_controller
768                .track_bytes_read(value.len() as u64)?;
769        }
770        Ok(value)
771    }
772
773    fn find_keys_by_prefix_new(
774        &mut self,
775        key_prefix: Vec<u8>,
776    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
777        let mut this = self.inner();
778        let id = this.current_application().id;
779        this.resource_controller.track_read_operation()?;
780        let receiver = this.execution_state_sender.send_request(move |callback| {
781            ExecutionRequest::FindKeysByPrefix {
782                id,
783                key_prefix,
784                callback,
785            }
786        })?;
787        let state = this.view_user_states.entry(id).or_default();
788        state.find_keys_queries.register(receiver)
789    }
790
791    fn find_keys_by_prefix_wait(
792        &mut self,
793        promise: &Self::FindKeysByPrefix,
794    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
795        let mut this = self.inner();
796        let id = this.current_application().id;
797        let keys = {
798            let state = this.view_user_states.entry(id).or_default();
799            state.find_keys_queries.wait(*promise)?
800        };
801        let mut read_size = 0;
802        for key in &keys {
803            read_size += key.len();
804        }
805        this.resource_controller
806            .track_bytes_read(read_size as u64)?;
807        Ok(keys)
808    }
809
810    fn find_key_values_by_prefix_new(
811        &mut self,
812        key_prefix: Vec<u8>,
813    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
814        let mut this = self.inner();
815        let id = this.current_application().id;
816        this.resource_controller.track_read_operation()?;
817        let receiver = this.execution_state_sender.send_request(move |callback| {
818            ExecutionRequest::FindKeyValuesByPrefix {
819                id,
820                key_prefix,
821                callback,
822            }
823        })?;
824        let state = this.view_user_states.entry(id).or_default();
825        state.find_key_values_queries.register(receiver)
826    }
827
828    fn find_key_values_by_prefix_wait(
829        &mut self,
830        promise: &Self::FindKeyValuesByPrefix,
831    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
832        let mut this = self.inner();
833        let id = this.current_application().id;
834        let state = this.view_user_states.entry(id).or_default();
835        let key_values = state.find_key_values_queries.wait(*promise)?;
836        let mut read_size = 0;
837        for (key, value) in &key_values {
838            read_size += key.len() + value.len();
839        }
840        this.resource_controller
841            .track_bytes_read(read_size as u64)?;
842        Ok(key_values)
843    }
844
845    fn perform_http_request(
846        &mut self,
847        request: http::Request,
848    ) -> Result<http::Response, ExecutionError> {
849        let mut this = self.inner();
850        let app_permissions = this
851            .execution_state_sender
852            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
853            .recv_response()?;
854
855        let app_id = this.current_application().id;
856        ensure!(
857            app_permissions.can_make_http_requests(&app_id),
858            ExecutionError::UnauthorizedApplication(app_id)
859        );
860
861        this.resource_controller.track_http_request()?;
862
863        this.execution_state_sender
864            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
865                request,
866                http_responses_are_oracle_responses:
867                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
868                callback,
869            })?
870            .recv_response()
871    }
872
873    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
874        let this = self.inner();
875        this.execution_state_sender
876            .send_request(|callback| ExecutionRequest::AssertBefore {
877                timestamp,
878                callback,
879            })?
880            .recv_response()?
881    }
882
883    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
884        let this = self.inner();
885        let blob_id = hash.into();
886        let content = this
887            .execution_state_sender
888            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
889            .recv_response()?;
890        Ok(content.into_vec_or_clone())
891    }
892
893    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
894        let this = self.inner();
895        let blob_id = hash.into();
896        this.execution_state_sender
897            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
898            .recv_response()
899    }
900}
901
902/// An extension trait to determine in compile time the different behaviors between contract and
903/// services in the implementation of [`BaseRuntime`].
904trait ContractOrServiceRuntime {
905    /// Configured to `true` if the HTTP response size should be limited to the oracle response
906    /// size.
907    ///
908    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
909    /// and only storing in the block a shorter oracle response.
910    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
911}
912
913impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
914    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
915}
916
917impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
918    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
919}
920
921impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
922    fn clone(&self) -> Self {
923        SyncRuntimeHandle(self.0.clone())
924    }
925}
926
927impl ContractSyncRuntime {
928    pub(crate) fn new(
929        execution_state_sender: ExecutionStateSender,
930        chain_id: ChainId,
931        refund_grant_to: Option<Account>,
932        resource_controller: ResourceController,
933        action: &UserAction,
934    ) -> Self {
935        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
936            SyncRuntimeInternal::new(
937                chain_id,
938                action.height(),
939                action.round(),
940                if let UserAction::Message(context, _) = action {
941                    Some(context.into())
942                } else {
943                    None
944                },
945                execution_state_sender,
946                None,
947                refund_grant_to,
948                resource_controller,
949                action.timestamp(),
950            ),
951        )))
952    }
953
954    pub(crate) fn preload_contract(
955        &self,
956        id: ApplicationId,
957        code: UserContractCode,
958        description: ApplicationDescription,
959    ) -> Result<(), ExecutionError> {
960        let this = self
961            .0
962            .as_ref()
963            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
964        let runtime_handle = this.clone();
965        let mut this_guard = this.inner();
966
967        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
968            entry.insert(LoadedApplication::new(
969                code.instantiate(runtime_handle)?,
970                description,
971            ));
972            this_guard.applications_to_finalize.push(id);
973        }
974
975        Ok(())
976    }
977
978    /// Main entry point to start executing a user action.
979    pub(crate) fn run_action(
980        mut self,
981        application_id: ApplicationId,
982        chain_id: ChainId,
983        action: UserAction,
984    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
985        let result = self
986            .deref_mut()
987            .run_action(application_id, chain_id, action)?;
988        let runtime = self
989            .into_inner()
990            .expect("Runtime clones should have been freed by now");
991
992        Ok((result, runtime.resource_controller))
993    }
994}
995
996impl ContractSyncRuntimeHandle {
997    fn run_action(
998        &mut self,
999        application_id: ApplicationId,
1000        chain_id: ChainId,
1001        action: UserAction,
1002    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1003        let finalize_context = FinalizeContext {
1004            authenticated_signer: action.signer(),
1005            chain_id,
1006            height: action.height(),
1007            round: action.round(),
1008        };
1009
1010        {
1011            let runtime = self.inner();
1012            assert_eq!(runtime.chain_id, chain_id);
1013            assert_eq!(runtime.height, action.height());
1014        }
1015
1016        let signer = action.signer();
1017        let closure = move |code: &mut UserContractInstance| match action {
1018            UserAction::Instantiate(_context, argument) => {
1019                code.instantiate(argument).map(|()| None)
1020            }
1021            UserAction::Operation(_context, operation) => {
1022                code.execute_operation(operation).map(Option::Some)
1023            }
1024            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1025            UserAction::ProcessStreams(_context, updates) => {
1026                code.process_streams(updates).map(|()| None)
1027            }
1028        };
1029
1030        let result = self.execute(application_id, signer, closure)?;
1031        self.finalize(finalize_context)?;
1032        Ok(result)
1033    }
1034
1035    /// Notifies all loaded applications that execution is finalizing.
1036    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1037        let applications = mem::take(&mut self.inner().applications_to_finalize)
1038            .into_iter()
1039            .rev();
1040
1041        self.inner().is_finalizing = true;
1042
1043        for application in applications {
1044            self.execute(application, context.authenticated_signer, |contract| {
1045                contract.finalize().map(|_| None)
1046            })?;
1047            self.inner().loaded_applications.remove(&application);
1048        }
1049
1050        Ok(())
1051    }
1052
1053    /// Executes a `closure` with the contract code for the `application_id`.
1054    fn execute(
1055        &mut self,
1056        application_id: ApplicationId,
1057        signer: Option<AccountOwner>,
1058        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1059    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1060        let contract = {
1061            let mut runtime = self.inner();
1062            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1063
1064            let status = ApplicationStatus {
1065                caller_id: None,
1066                id: application_id,
1067                description: application.description.clone(),
1068                signer,
1069            };
1070
1071            runtime.push_application(status);
1072
1073            application
1074        };
1075
1076        let result = closure(
1077            &mut contract
1078                .instance
1079                .try_lock()
1080                .expect("Application should not be already executing"),
1081        )?;
1082
1083        let mut runtime = self.inner();
1084        let application_status = runtime.pop_application();
1085        assert_eq!(application_status.caller_id, None);
1086        assert_eq!(application_status.id, application_id);
1087        assert_eq!(application_status.description, contract.description);
1088        assert_eq!(application_status.signer, signer);
1089        assert!(runtime.call_stack.is_empty());
1090
1091        Ok(result)
1092    }
1093}
1094
1095impl ContractRuntime for ContractSyncRuntimeHandle {
1096    fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1097        let this = self.inner();
1098        Ok(this.current_application().signer)
1099    }
1100
1101    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1102        Ok(self
1103            .inner()
1104            .executing_message
1105            .map(|metadata| metadata.is_bouncing))
1106    }
1107
1108    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1109        Ok(self
1110            .inner()
1111            .executing_message
1112            .map(|metadata| metadata.origin))
1113    }
1114
1115    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1116        let this = self.inner();
1117        if this.call_stack.len() <= 1 {
1118            return Ok(None);
1119        }
1120        Ok(this.current_application().caller_id)
1121    }
1122
1123    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1124        Ok(match vm_runtime {
1125            VmRuntime::Wasm => {
1126                self.inner()
1127                    .resource_controller
1128                    .policy()
1129                    .maximum_wasm_fuel_per_block
1130            }
1131            VmRuntime::Evm => {
1132                self.inner()
1133                    .resource_controller
1134                    .policy()
1135                    .maximum_evm_fuel_per_block
1136            }
1137        })
1138    }
1139
1140    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1141        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1142    }
1143
1144    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1145        let mut this = self.inner();
1146        this.resource_controller.track_fuel(fuel, vm_runtime)
1147    }
1148
1149    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1150        let mut this = self.inner();
1151        let application = this.current_application();
1152        let application_id = application.id;
1153        let authenticated_signer = application.signer;
1154        let mut refund_grant_to = this.refund_grant_to;
1155
1156        let grant = this
1157            .resource_controller
1158            .policy()
1159            .total_price(&message.grant)?;
1160        if grant.is_zero() {
1161            refund_grant_to = None;
1162        } else {
1163            this.resource_controller.track_grant(grant)?;
1164        }
1165        let kind = if message.is_tracked {
1166            MessageKind::Tracked
1167        } else {
1168            MessageKind::Simple
1169        };
1170
1171        this.execution_state_sender
1172            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1173                message: OutgoingMessage {
1174                    destination: message.destination,
1175                    authenticated_signer,
1176                    refund_grant_to,
1177                    grant,
1178                    kind,
1179                    message: Message::User {
1180                        application_id,
1181                        bytes: message.message,
1182                    },
1183                },
1184                callback,
1185            })?
1186            .recv_response()?;
1187
1188        Ok(())
1189    }
1190
1191    fn transfer(
1192        &mut self,
1193        source: AccountOwner,
1194        destination: Account,
1195        amount: Amount,
1196    ) -> Result<(), ExecutionError> {
1197        let this = self.inner();
1198        let current_application = this.current_application();
1199        let application_id = current_application.id;
1200        let signer = current_application.signer;
1201
1202        this.execution_state_sender
1203            .send_request(|callback| ExecutionRequest::Transfer {
1204                source,
1205                destination,
1206                amount,
1207                signer,
1208                application_id,
1209                callback,
1210            })?
1211            .recv_response()?;
1212        Ok(())
1213    }
1214
1215    fn claim(
1216        &mut self,
1217        source: Account,
1218        destination: Account,
1219        amount: Amount,
1220    ) -> Result<(), ExecutionError> {
1221        let this = self.inner();
1222        let current_application = this.current_application();
1223        let application_id = current_application.id;
1224        let signer = current_application.signer;
1225
1226        this.execution_state_sender
1227            .send_request(|callback| ExecutionRequest::Claim {
1228                source,
1229                destination,
1230                amount,
1231                signer,
1232                application_id,
1233                callback,
1234            })?
1235            .recv_response()?;
1236        Ok(())
1237    }
1238
1239    fn try_call_application(
1240        &mut self,
1241        authenticated: bool,
1242        callee_id: ApplicationId,
1243        argument: Vec<u8>,
1244    ) -> Result<Vec<u8>, ExecutionError> {
1245        let contract = self
1246            .inner()
1247            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1248
1249        let value = contract
1250            .try_lock()
1251            .expect("Applications should not have reentrant calls")
1252            .execute_operation(argument)?;
1253
1254        self.inner().finish_call();
1255
1256        Ok(value)
1257    }
1258
1259    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1260        let mut this = self.inner();
1261        ensure!(
1262            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1263            ExecutionError::StreamNameTooLong
1264        );
1265        let application_id = GenericApplicationId::User(this.current_application().id);
1266        let stream_id = StreamId {
1267            stream_name,
1268            application_id,
1269        };
1270        let value_len = value.len() as u64;
1271        let index = this
1272            .execution_state_sender
1273            .send_request(|callback| ExecutionRequest::Emit {
1274                stream_id,
1275                value,
1276                callback,
1277            })?
1278            .recv_response()?;
1279        // TODO(#365): Consider separate event fee categories.
1280        this.resource_controller.track_bytes_written(value_len)?;
1281        Ok(index)
1282    }
1283
1284    fn read_event(
1285        &mut self,
1286        chain_id: ChainId,
1287        stream_name: StreamName,
1288        index: u32,
1289    ) -> Result<Vec<u8>, ExecutionError> {
1290        let mut this = self.inner();
1291        ensure!(
1292            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1293            ExecutionError::StreamNameTooLong
1294        );
1295        let application_id = GenericApplicationId::User(this.current_application().id);
1296        let stream_id = StreamId {
1297            stream_name,
1298            application_id,
1299        };
1300        let event_id = EventId {
1301            stream_id,
1302            index,
1303            chain_id,
1304        };
1305        let event = this
1306            .execution_state_sender
1307            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1308            .recv_response()?;
1309        // TODO(#365): Consider separate event fee categories.
1310        this.resource_controller
1311            .track_bytes_read(event.len() as u64)?;
1312        Ok(event)
1313    }
1314
1315    fn subscribe_to_events(
1316        &mut self,
1317        chain_id: ChainId,
1318        application_id: ApplicationId,
1319        stream_name: StreamName,
1320    ) -> Result<(), ExecutionError> {
1321        let this = self.inner();
1322        ensure!(
1323            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1324            ExecutionError::StreamNameTooLong
1325        );
1326        let stream_id = StreamId {
1327            stream_name,
1328            application_id: application_id.into(),
1329        };
1330        let subscriber_app_id = this.current_application().id;
1331        this.execution_state_sender
1332            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1333                chain_id,
1334                stream_id,
1335                subscriber_app_id,
1336                callback,
1337            })?
1338            .recv_response()?;
1339        Ok(())
1340    }
1341
1342    fn unsubscribe_from_events(
1343        &mut self,
1344        chain_id: ChainId,
1345        application_id: ApplicationId,
1346        stream_name: StreamName,
1347    ) -> Result<(), ExecutionError> {
1348        let this = self.inner();
1349        ensure!(
1350            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1351            ExecutionError::StreamNameTooLong
1352        );
1353        let stream_id = StreamId {
1354            stream_name,
1355            application_id: application_id.into(),
1356        };
1357        let subscriber_app_id = this.current_application().id;
1358        this.execution_state_sender
1359            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1360                chain_id,
1361                stream_id,
1362                subscriber_app_id,
1363                callback,
1364            })?
1365            .recv_response()?;
1366        Ok(())
1367    }
1368
1369    fn query_service(
1370        &mut self,
1371        application_id: ApplicationId,
1372        query: Vec<u8>,
1373    ) -> Result<Vec<u8>, ExecutionError> {
1374        let mut this = self.inner();
1375
1376        let app_permissions = this
1377            .execution_state_sender
1378            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1379            .recv_response()?;
1380
1381        let app_id = this.current_application().id;
1382        ensure!(
1383            app_permissions.can_call_services(&app_id),
1384            ExecutionError::UnauthorizedApplication(app_id)
1385        );
1386
1387        this.resource_controller.track_service_oracle_call()?;
1388
1389        this.run_service_oracle_query(application_id, query)
1390    }
1391
1392    fn open_chain(
1393        &mut self,
1394        ownership: ChainOwnership,
1395        application_permissions: ApplicationPermissions,
1396        balance: Amount,
1397    ) -> Result<ChainId, ExecutionError> {
1398        let parent_id = self.inner().chain_id;
1399        let block_height = self.block_height()?;
1400
1401        let timestamp = self.inner().user_context;
1402
1403        let chain_id = self
1404            .inner()
1405            .execution_state_sender
1406            .send_request(|callback| ExecutionRequest::OpenChain {
1407                ownership,
1408                balance,
1409                parent_id,
1410                block_height,
1411                timestamp,
1412                application_permissions,
1413                callback,
1414            })?
1415            .recv_response()?;
1416
1417        Ok(chain_id)
1418    }
1419
1420    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1421        let this = self.inner();
1422        let application_id = this.current_application().id;
1423        this.execution_state_sender
1424            .send_request(|callback| ExecutionRequest::CloseChain {
1425                application_id,
1426                callback,
1427            })?
1428            .recv_response()?
1429    }
1430
1431    fn change_application_permissions(
1432        &mut self,
1433        application_permissions: ApplicationPermissions,
1434    ) -> Result<(), ExecutionError> {
1435        let this = self.inner();
1436        let application_id = this.current_application().id;
1437        this.execution_state_sender
1438            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1439                application_id,
1440                application_permissions,
1441                callback,
1442            })?
1443            .recv_response()?
1444    }
1445
1446    fn create_application(
1447        &mut self,
1448        module_id: ModuleId,
1449        parameters: Vec<u8>,
1450        argument: Vec<u8>,
1451        required_application_ids: Vec<ApplicationId>,
1452    ) -> Result<ApplicationId, ExecutionError> {
1453        let chain_id = self.inner().chain_id;
1454        let block_height = self.block_height()?;
1455
1456        let CreateApplicationResult { app_id } = self
1457            .inner()
1458            .execution_state_sender
1459            .send_request(move |callback| ExecutionRequest::CreateApplication {
1460                chain_id,
1461                block_height,
1462                module_id,
1463                parameters,
1464                required_application_ids,
1465                callback,
1466            })?
1467            .recv_response()?;
1468
1469        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1470
1471        contract
1472            .try_lock()
1473            .expect("Applications should not have reentrant calls")
1474            .instantiate(argument)?;
1475
1476        self.inner().finish_call();
1477
1478        Ok(app_id)
1479    }
1480
1481    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1482        let blob = Blob::new_data(bytes);
1483        let blob_id = blob.id();
1484        let this = self.inner();
1485        this.execution_state_sender
1486            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1487            .recv_response()?;
1488        Ok(DataBlobHash(blob_id.hash))
1489    }
1490
1491    fn publish_module(
1492        &mut self,
1493        contract: Bytecode,
1494        service: Bytecode,
1495        vm_runtime: VmRuntime,
1496    ) -> Result<ModuleId, ExecutionError> {
1497        let (blobs, module_id) =
1498            crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1499        let this = self.inner();
1500        for blob in blobs {
1501            this.execution_state_sender
1502                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1503                .recv_response()?;
1504        }
1505        Ok(module_id)
1506    }
1507
1508    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1509        let this = self.inner();
1510        let round = this.round;
1511        this.execution_state_sender
1512            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1513            .recv_response()
1514    }
1515
1516    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1517        let mut this = self.inner();
1518        let id = this.current_application().id;
1519        let state = this.view_user_states.entry(id).or_default();
1520        state.force_all_pending_queries()?;
1521        this.resource_controller.track_write_operations(
1522            batch
1523                .num_operations()
1524                .try_into()
1525                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1526        )?;
1527        this.resource_controller
1528            .track_bytes_written(batch.size() as u64)?;
1529        this.execution_state_sender
1530            .send_request(|callback| ExecutionRequest::WriteBatch {
1531                id,
1532                batch,
1533                callback,
1534            })?
1535            .recv_response()?;
1536        Ok(())
1537    }
1538}
1539
1540impl ServiceSyncRuntime {
1541    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1542    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1543        Self::new_with_deadline(execution_state_sender, context, None)
1544    }
1545
1546    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1547    pub fn new_with_deadline(
1548        execution_state_sender: ExecutionStateSender,
1549        context: QueryContext,
1550        deadline: Option<Instant>,
1551    ) -> Self {
1552        let runtime = SyncRuntime(Some(
1553            SyncRuntimeInternal::new(
1554                context.chain_id,
1555                context.next_block_height,
1556                None,
1557                None,
1558                execution_state_sender,
1559                deadline,
1560                None,
1561                ResourceController::default(),
1562                (),
1563            )
1564            .into(),
1565        ));
1566
1567        ServiceSyncRuntime {
1568            runtime,
1569            current_context: context,
1570        }
1571    }
1572
1573    /// Loads a service into the runtime's memory.
1574    pub(crate) fn preload_service(
1575        &self,
1576        id: ApplicationId,
1577        code: UserServiceCode,
1578        description: ApplicationDescription,
1579    ) -> Result<(), ExecutionError> {
1580        let this = self
1581            .runtime
1582            .0
1583            .as_ref()
1584            .expect("services shouldn't be preloaded while the runtime is being dropped");
1585        let runtime_handle = this.clone();
1586        let mut this_guard = this.inner();
1587
1588        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1589            entry.insert(LoadedApplication::new(
1590                code.instantiate(runtime_handle)?,
1591                description,
1592            ));
1593            this_guard.applications_to_finalize.push(id);
1594        }
1595
1596        Ok(())
1597    }
1598
1599    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1600    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1601        while let Ok(request) = incoming_requests.recv() {
1602            let ServiceRuntimeRequest::Query {
1603                application_id,
1604                context,
1605                query,
1606                callback,
1607            } = request;
1608
1609            let result = self
1610                .prepare_for_query(context)
1611                .and_then(|()| self.run_query(application_id, query));
1612
1613            if let Err(err) = callback.send(result) {
1614                tracing::debug!(%err, "Receiver for query result has been dropped");
1615            }
1616        }
1617    }
1618
1619    /// Prepares the runtime to query an application.
1620    pub(crate) fn prepare_for_query(
1621        &mut self,
1622        new_context: QueryContext,
1623    ) -> Result<(), ExecutionError> {
1624        let expected_context = QueryContext {
1625            local_time: new_context.local_time,
1626            ..self.current_context
1627        };
1628
1629        if new_context != expected_context {
1630            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1631            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1632        } else {
1633            self.handle_mut()
1634                .inner()
1635                .execution_state_sender
1636                .send_request(|callback| ExecutionRequest::SetLocalTime {
1637                    local_time: new_context.local_time,
1638                    callback,
1639                })?
1640                .recv_response()?;
1641        }
1642        Ok(())
1643    }
1644
1645    /// Queries an application specified by its [`ApplicationId`].
1646    pub(crate) fn run_query(
1647        &mut self,
1648        application_id: ApplicationId,
1649        query: Vec<u8>,
1650    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1651        let this = self.handle_mut();
1652        let response = this.try_query_application(application_id, query)?;
1653        let operations = mem::take(&mut this.inner().scheduled_operations);
1654
1655        Ok(QueryOutcome {
1656            response,
1657            operations,
1658        })
1659    }
1660
1661    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1662    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1663        self.runtime.0.as_mut().expect(
1664            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1665        )
1666    }
1667}
1668
1669impl ServiceRuntime for ServiceSyncRuntimeHandle {
1670    /// Note that queries are not available from writable contexts.
1671    fn try_query_application(
1672        &mut self,
1673        queried_id: ApplicationId,
1674        argument: Vec<u8>,
1675    ) -> Result<Vec<u8>, ExecutionError> {
1676        let service = {
1677            let mut this = self.inner();
1678
1679            // Load the application.
1680            let application = this.load_service_instance(self.clone(), queried_id)?;
1681            // Make the call to user code.
1682            this.push_application(ApplicationStatus {
1683                caller_id: None,
1684                id: queried_id,
1685                description: application.description,
1686                signer: None,
1687            });
1688            application.instance
1689        };
1690        let response = service
1691            .try_lock()
1692            .expect("Applications should not have reentrant calls")
1693            .handle_query(argument)?;
1694        self.inner().pop_application();
1695        Ok(response)
1696    }
1697
1698    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1699        let mut this = self.inner();
1700        let application_id = this.current_application().id;
1701
1702        this.scheduled_operations.push(Operation::User {
1703            application_id,
1704            bytes: operation,
1705        });
1706
1707        Ok(())
1708    }
1709
1710    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1711        if let Some(deadline) = self.inner().deadline {
1712            if Instant::now() >= deadline {
1713                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1714            }
1715        }
1716        Ok(())
1717    }
1718}
1719
1720/// A request to the service runtime actor.
1721pub enum ServiceRuntimeRequest {
1722    Query {
1723        application_id: ApplicationId,
1724        context: QueryContext,
1725        query: Vec<u8>,
1726        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1727    },
1728}
1729
1730/// The origin of the execution.
1731#[derive(Clone, Copy, Debug)]
1732struct ExecutingMessage {
1733    is_bouncing: bool,
1734    origin: ChainId,
1735}
1736
1737impl From<&MessageContext> for ExecutingMessage {
1738    fn from(context: &MessageContext) -> Self {
1739        ExecutingMessage {
1740            is_bouncing: context.is_bouncing,
1741            origin: context.origin,
1742        }
1743    }
1744}
1745
1746/// Creates a compressed contract and service bytecode synchronously.
1747pub fn create_bytecode_blobs_sync(
1748    contract: Bytecode,
1749    service: Bytecode,
1750    vm_runtime: VmRuntime,
1751) -> (Vec<Blob>, ModuleId) {
1752    match vm_runtime {
1753        VmRuntime::Wasm => {
1754            let compressed_contract = contract.compress();
1755            let compressed_service = service.compress();
1756            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1757            let service_blob = Blob::new_service_bytecode(compressed_service);
1758            let module_id =
1759                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1760            (vec![contract_blob, service_blob], module_id)
1761        }
1762        VmRuntime::Evm => {
1763            let compressed_contract = contract.compress();
1764            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1765            let module_id = ModuleId::new(
1766                evm_contract_blob.id().hash,
1767                evm_contract_blob.id().hash,
1768                vm_runtime,
1769            );
1770            (vec![evm_contract_blob], module_id)
1771        }
1772    }
1773}