linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9    time::Instant,
10};
11
12use custom_debug_derive::Debug;
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{
16        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
17        OracleResponse, SendMessageRequest, Timestamp,
18    },
19    ensure, http,
20    identifiers::{
21        Account, AccountOwner, BlobId, BlobType, ChainId, EventId, GenericApplicationId, MessageId,
22        StreamId, StreamName,
23    },
24    ownership::ChainOwnership,
25    vm::VmRuntime,
26};
27use linera_views::batch::Batch;
28use oneshot::Receiver;
29
30use crate::{
31    execution::UserAction,
32    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
33    resources::ResourceController,
34    system::CreateApplicationResult,
35    util::{ReceiverExt, UnboundedSenderExt},
36    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, ExecutionError,
37    FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation, OutgoingMessage,
38    QueryContext, QueryOutcome, ServiceRuntime, TransactionTracker, UserContractCode,
39    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
40};
41
42#[cfg(test)]
43#[path = "unit_tests/runtime_tests.rs"]
44mod tests;
45
46pub trait WithContext {
47    type UserContext;
48}
49
50impl WithContext for UserContractInstance {
51    type UserContext = Timestamp;
52}
53
54impl WithContext for UserServiceInstance {
55    type UserContext = ();
56}
57
58#[cfg(test)]
59impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
60    type UserContext = ();
61}
62
63#[derive(Debug)]
64pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
65
66pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
67
68pub struct ServiceSyncRuntime {
69    runtime: SyncRuntime<UserServiceInstance>,
70    current_context: QueryContext,
71}
72
73#[derive(Debug)]
74pub struct SyncRuntimeHandle<UserInstance: WithContext>(
75    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
76);
77
78pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
79pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
80
81/// Runtime data tracked during the execution of a transaction on the synchronous thread.
82#[derive(Debug)]
83pub struct SyncRuntimeInternal<UserInstance: WithContext> {
84    /// The current chain ID.
85    chain_id: ChainId,
86    /// The height of the next block that will be added to this chain. During operations
87    /// and messages, this is the current block height.
88    height: BlockHeight,
89    /// The current consensus round. Only available during block validation in multi-leader rounds.
90    round: Option<u32>,
91    /// The authenticated signer of the operation or message, if any.
92    #[debug(skip_if = Option::is_none)]
93    authenticated_signer: Option<AccountOwner>,
94    /// The current message being executed, if there is one.
95    #[debug(skip_if = Option::is_none)]
96    executing_message: Option<ExecutingMessage>,
97
98    /// How to interact with the storage view of the execution state.
99    execution_state_sender: ExecutionStateSender,
100
101    /// If applications are being finalized.
102    ///
103    /// If [`true`], disables cross-application calls.
104    is_finalizing: bool,
105    /// Applications that need to be finalized.
106    applications_to_finalize: Vec<ApplicationId>,
107
108    /// Application instances loaded in this transaction.
109    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
110    /// The current stack of application descriptions.
111    call_stack: Vec<ApplicationStatus>,
112    /// The set of the IDs of the applications that are in the `call_stack`.
113    active_applications: HashSet<ApplicationId>,
114    /// The tracking information for this transaction.
115    transaction_tracker: TransactionTracker,
116    /// The operations scheduled during this query.
117    scheduled_operations: Vec<Operation>,
118
119    /// Track application states based on views.
120    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
121
122    /// The deadline this runtime should finish executing.
123    ///
124    /// Used to limit the execution time of services running as oracles.
125    deadline: Option<Instant>,
126
127    /// Where to send a refund for the unused part of the grant after execution, if any.
128    #[debug(skip_if = Option::is_none)]
129    refund_grant_to: Option<Account>,
130    /// Controller to track fuel and storage consumption.
131    resource_controller: ResourceController,
132    /// Additional context for the runtime.
133    user_context: UserInstance::UserContext,
134}
135
136/// The runtime status of an application.
137#[derive(Debug)]
138struct ApplicationStatus {
139    /// The caller application ID, if forwarded during the call.
140    caller_id: Option<ApplicationId>,
141    /// The application ID.
142    id: ApplicationId,
143    /// The application description.
144    description: ApplicationDescription,
145    /// The authenticated signer for the execution thread, if any.
146    signer: Option<AccountOwner>,
147}
148
149/// A loaded application instance.
150#[derive(Debug)]
151struct LoadedApplication<Instance> {
152    instance: Arc<Mutex<Instance>>,
153    description: ApplicationDescription,
154}
155
156impl<Instance> LoadedApplication<Instance> {
157    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
158    fn new(instance: Instance, description: ApplicationDescription) -> Self {
159        LoadedApplication {
160            instance: Arc::new(Mutex::new(instance)),
161            description,
162        }
163    }
164}
165
166impl<Instance> Clone for LoadedApplication<Instance> {
167    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
168    // bound
169    fn clone(&self) -> Self {
170        LoadedApplication {
171            instance: self.instance.clone(),
172            description: self.description.clone(),
173        }
174    }
175}
176
177#[derive(Debug)]
178enum Promise<T> {
179    Ready(T),
180    Pending(Receiver<T>),
181}
182
183impl<T> Promise<T> {
184    fn force(&mut self) -> Result<(), ExecutionError> {
185        if let Promise::Pending(receiver) = self {
186            let value = receiver
187                .recv_ref()
188                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
189            *self = Promise::Ready(value);
190        }
191        Ok(())
192    }
193
194    fn read(self) -> Result<T, ExecutionError> {
195        match self {
196            Promise::Pending(receiver) => {
197                let value = receiver.recv_response()?;
198                Ok(value)
199            }
200            Promise::Ready(value) => Ok(value),
201        }
202    }
203}
204
205/// Manages a set of pending queries returning values of type `T`.
206#[derive(Debug, Default)]
207struct QueryManager<T> {
208    /// The queries in progress.
209    pending_queries: BTreeMap<u32, Promise<T>>,
210    /// The number of queries ever registered so far. Used for the index of the next query.
211    query_count: u32,
212    /// The number of active queries.
213    active_query_count: u32,
214}
215
216impl<T> QueryManager<T> {
217    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
218        let id = self.query_count;
219        self.pending_queries.insert(id, Promise::Pending(receiver));
220        self.query_count = self
221            .query_count
222            .checked_add(1)
223            .ok_or(ArithmeticError::Overflow)?;
224        self.active_query_count = self
225            .active_query_count
226            .checked_add(1)
227            .ok_or(ArithmeticError::Overflow)?;
228        Ok(id)
229    }
230
231    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
232        let promise = self
233            .pending_queries
234            .remove(&id)
235            .ok_or(ExecutionError::InvalidPromise)?;
236        let value = promise.read()?;
237        self.active_query_count -= 1;
238        Ok(value)
239    }
240
241    fn force_all(&mut self) -> Result<(), ExecutionError> {
242        for promise in self.pending_queries.values_mut() {
243            promise.force()?;
244        }
245        Ok(())
246    }
247}
248
249type Keys = Vec<Vec<u8>>;
250type Value = Vec<u8>;
251type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
252
253#[derive(Debug, Default)]
254struct ViewUserState {
255    /// The contains-key queries in progress.
256    contains_key_queries: QueryManager<bool>,
257    /// The contains-keys queries in progress.
258    contains_keys_queries: QueryManager<Vec<bool>>,
259    /// The read-value queries in progress.
260    read_value_queries: QueryManager<Option<Value>>,
261    /// The read-multi-values queries in progress.
262    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
263    /// The find-keys queries in progress.
264    find_keys_queries: QueryManager<Keys>,
265    /// The find-key-values queries in progress.
266    find_key_values_queries: QueryManager<KeyValues>,
267}
268
269impl ViewUserState {
270    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
271        self.contains_key_queries.force_all()?;
272        self.contains_keys_queries.force_all()?;
273        self.read_value_queries.force_all()?;
274        self.read_multi_values_queries.force_all()?;
275        self.find_keys_queries.force_all()?;
276        self.find_key_values_queries.force_all()?;
277        Ok(())
278    }
279}
280
281impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
282    type Target = SyncRuntimeHandle<UserInstance>;
283
284    fn deref(&self) -> &Self::Target {
285        self.0.as_ref().expect(
286            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
287        )
288    }
289}
290
291impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
292    fn deref_mut(&mut self) -> &mut Self::Target {
293        self.0.as_mut().expect(
294            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
295        )
296    }
297}
298
299impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
300    fn drop(&mut self) {
301        // Ensure the `loaded_applications` are cleared to prevent circular references in
302        // the runtime
303        if let Some(handle) = self.0.take() {
304            handle.inner().loaded_applications.clear();
305        }
306    }
307}
308
309impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
310    #[expect(clippy::too_many_arguments)]
311    fn new(
312        chain_id: ChainId,
313        height: BlockHeight,
314        round: Option<u32>,
315        authenticated_signer: Option<AccountOwner>,
316        executing_message: Option<ExecutingMessage>,
317        execution_state_sender: ExecutionStateSender,
318        deadline: Option<Instant>,
319        refund_grant_to: Option<Account>,
320        resource_controller: ResourceController,
321        transaction_tracker: TransactionTracker,
322        user_context: UserInstance::UserContext,
323    ) -> Self {
324        Self {
325            chain_id,
326            height,
327            round,
328            authenticated_signer,
329            executing_message,
330            execution_state_sender,
331            is_finalizing: false,
332            applications_to_finalize: Vec::new(),
333            loaded_applications: HashMap::new(),
334            call_stack: Vec::new(),
335            active_applications: HashSet::new(),
336            view_user_states: BTreeMap::new(),
337            deadline,
338            refund_grant_to,
339            resource_controller,
340            transaction_tracker,
341            scheduled_operations: Vec::new(),
342            user_context,
343        }
344    }
345
346    /// Returns the [`ApplicationStatus`] of the current application.
347    ///
348    /// The current application is the last to be pushed to the `call_stack`.
349    ///
350    /// # Panics
351    ///
352    /// If the call stack is empty.
353    fn current_application(&self) -> &ApplicationStatus {
354        self.call_stack
355            .last()
356            .expect("Call stack is unexpectedly empty")
357    }
358
359    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
360    ///
361    /// Ensures the application's ID is also tracked in the `active_applications` set.
362    fn push_application(&mut self, status: ApplicationStatus) {
363        self.active_applications.insert(status.id);
364        self.call_stack.push(status);
365    }
366
367    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
368    ///
369    /// Ensures the application's ID is also removed from the `active_applications` set.
370    ///
371    /// # Panics
372    ///
373    /// If the call stack is empty.
374    fn pop_application(&mut self) -> ApplicationStatus {
375        let status = self
376            .call_stack
377            .pop()
378            .expect("Can't remove application from empty call stack");
379        assert!(self.active_applications.remove(&status.id));
380        status
381    }
382
383    /// Ensures that a call to `application_id` is not-reentrant.
384    ///
385    /// Returns an error if there already is an entry for `application_id` in the call stack.
386    fn check_for_reentrancy(
387        &mut self,
388        application_id: ApplicationId,
389    ) -> Result<(), ExecutionError> {
390        ensure!(
391            !self.active_applications.contains(&application_id),
392            ExecutionError::ReentrantCall(application_id)
393        );
394        Ok(())
395    }
396}
397
398impl SyncRuntimeInternal<UserContractInstance> {
399    /// Loads a contract instance, initializing it with this runtime if needed.
400    fn load_contract_instance(
401        &mut self,
402        this: SyncRuntimeHandle<UserContractInstance>,
403        id: ApplicationId,
404    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
405        match self.loaded_applications.entry(id) {
406            // TODO(#2927): support dynamic loading of modules on the Web
407            #[cfg(web)]
408            hash_map::Entry::Vacant(_) => {
409                drop(this);
410                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
411                    id,
412                )))
413            }
414            #[cfg(not(web))]
415            hash_map::Entry::Vacant(entry) => {
416                let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
417                let (code, description, txn_tracker_moved) = self
418                    .execution_state_sender
419                    .send_request(move |callback| ExecutionRequest::LoadContract {
420                        id,
421                        callback,
422                        txn_tracker: txn_tracker_moved,
423                    })?
424                    .recv_response()?;
425                self.transaction_tracker = txn_tracker_moved;
426
427                let instance = code.instantiate(this)?;
428
429                self.applications_to_finalize.push(id);
430                Ok(entry
431                    .insert(LoadedApplication::new(instance, description))
432                    .clone())
433            }
434            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
435        }
436    }
437
438    /// Configures the runtime for executing a call to a different contract.
439    fn prepare_for_call(
440        &mut self,
441        this: ContractSyncRuntimeHandle,
442        authenticated: bool,
443        callee_id: ApplicationId,
444    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
445        self.check_for_reentrancy(callee_id)?;
446
447        ensure!(
448            !self.is_finalizing,
449            ExecutionError::CrossApplicationCallInFinalize {
450                caller_id: Box::new(self.current_application().id),
451                callee_id: Box::new(callee_id),
452            }
453        );
454
455        // Load the application.
456        let application = self.load_contract_instance(this, callee_id)?;
457
458        let caller = self.current_application();
459        let caller_id = caller.id;
460        let caller_signer = caller.signer;
461        // Make the call to user code.
462        let authenticated_signer = match caller_signer {
463            Some(signer) if authenticated => Some(signer),
464            _ => None,
465        };
466        let authenticated_caller_id = authenticated.then_some(caller_id);
467        self.push_application(ApplicationStatus {
468            caller_id: authenticated_caller_id,
469            id: callee_id,
470            description: application.description,
471            // Allow further nested calls to be authenticated if this one is.
472            signer: authenticated_signer,
473        });
474        Ok(application.instance)
475    }
476
477    /// Cleans up the runtime after the execution of a call to a different contract.
478    fn finish_call(&mut self) -> Result<(), ExecutionError> {
479        self.pop_application();
480        Ok(())
481    }
482
483    /// Runs the service in a separate thread as an oracle.
484    fn run_service_oracle_query(
485        &mut self,
486        application_id: ApplicationId,
487        query: Vec<u8>,
488    ) -> Result<Vec<u8>, ExecutionError> {
489        let context = QueryContext {
490            chain_id: self.chain_id,
491            next_block_height: self.height,
492            local_time: self.transaction_tracker.local_time(),
493        };
494        let sender = self.execution_state_sender.clone();
495
496        let txn_tracker = TransactionTracker::default()
497            .with_blobs(self.transaction_tracker.created_blobs().clone());
498
499        let timeout = self
500            .resource_controller
501            .remaining_service_oracle_execution_time()?;
502        let execution_start = Instant::now();
503        let deadline = Some(execution_start + timeout);
504
505        let mut service_runtime =
506            ServiceSyncRuntime::new_with_txn_tracker(sender, context, deadline, txn_tracker);
507
508        let result = service_runtime.run_query(application_id, query);
509
510        // Always track the execution time, irrespective to whether the service ran successfully or
511        // timed out
512        self.resource_controller
513            .track_service_oracle_execution(execution_start.elapsed())?;
514
515        let QueryOutcome {
516            response,
517            operations,
518        } = result?;
519
520        self.resource_controller
521            .track_service_oracle_response(response.len())?;
522
523        self.scheduled_operations.extend(operations);
524        Ok(response)
525    }
526}
527
528impl SyncRuntimeInternal<UserServiceInstance> {
529    /// Initializes a service instance with this runtime.
530    fn load_service_instance(
531        &mut self,
532        this: ServiceSyncRuntimeHandle,
533        id: ApplicationId,
534    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
535        match self.loaded_applications.entry(id) {
536            // TODO(#2927): support dynamic loading of modules on the Web
537            #[cfg(web)]
538            hash_map::Entry::Vacant(_) => {
539                drop(this);
540                Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
541                    id,
542                )))
543            }
544            #[cfg(not(web))]
545            hash_map::Entry::Vacant(entry) => {
546                let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
547                let (code, description, txn_tracker_moved) = self
548                    .execution_state_sender
549                    .send_request(move |callback| ExecutionRequest::LoadService {
550                        id,
551                        callback,
552                        txn_tracker: txn_tracker_moved,
553                    })?
554                    .recv_response()?;
555                self.transaction_tracker = txn_tracker_moved;
556
557                let instance = code.instantiate(this)?;
558                Ok(entry
559                    .insert(LoadedApplication::new(instance, description))
560                    .clone())
561            }
562            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
563        }
564    }
565}
566
567impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
568    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
569        let handle = self.0.take().expect(
570            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
571        );
572        let runtime = Arc::into_inner(handle.0)?
573            .into_inner()
574            .expect("`SyncRuntime` should run in a single thread which should not panic");
575        Some(runtime)
576    }
577}
578
579impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
580    for SyncRuntimeHandle<UserInstance>
581{
582    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
583        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
584    }
585}
586
587impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
588    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
589        self.0
590            .try_lock()
591            .expect("Synchronous runtimes run on a single execution thread")
592    }
593}
594
595impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
596where
597    Self: ContractOrServiceRuntime,
598{
599    type Read = ();
600    type ReadValueBytes = u32;
601    type ContainsKey = u32;
602    type ContainsKeys = u32;
603    type ReadMultiValuesBytes = u32;
604    type FindKeysByPrefix = u32;
605    type FindKeyValuesByPrefix = u32;
606
607    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
608        let mut this = self.inner();
609        let chain_id = this.chain_id;
610        this.resource_controller.track_runtime_chain_id()?;
611        Ok(chain_id)
612    }
613
614    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
615        let mut this = self.inner();
616        let height = this.height;
617        this.resource_controller.track_runtime_block_height()?;
618        Ok(height)
619    }
620
621    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
622        let mut this = self.inner();
623        let application_id = this.current_application().id;
624        this.resource_controller.track_runtime_application_id()?;
625        Ok(application_id)
626    }
627
628    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
629        let mut this = self.inner();
630        let application_creator_chain_id = this.current_application().description.creator_chain_id;
631        this.resource_controller.track_runtime_application_id()?;
632        Ok(application_creator_chain_id)
633    }
634
635    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
636        let mut this = self.inner();
637        let parameters = this.current_application().description.parameters.clone();
638        this.resource_controller
639            .track_runtime_application_parameters(&parameters)?;
640        Ok(parameters)
641    }
642
643    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
644        let mut this = self.inner();
645        let timestamp = this
646            .execution_state_sender
647            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
648            .recv_response()?;
649        this.resource_controller.track_runtime_timestamp()?;
650        Ok(timestamp)
651    }
652
653    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
654        let mut this = self.inner();
655        let balance = this
656            .execution_state_sender
657            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
658            .recv_response()?;
659        this.resource_controller.track_runtime_balance()?;
660        Ok(balance)
661    }
662
663    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
664        let mut this = self.inner();
665        let balance = this
666            .execution_state_sender
667            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
668            .recv_response()?;
669        this.resource_controller.track_runtime_balance()?;
670        Ok(balance)
671    }
672
673    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
674        let mut this = self.inner();
675        let owner_balances = this
676            .execution_state_sender
677            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
678            .recv_response()?;
679        this.resource_controller
680            .track_runtime_owner_balances(&owner_balances)?;
681        Ok(owner_balances)
682    }
683
684    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
685        let mut this = self.inner();
686        let owners = this
687            .execution_state_sender
688            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
689            .recv_response()?;
690        this.resource_controller.track_runtime_owners(&owners)?;
691        Ok(owners)
692    }
693
694    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
695        let mut this = self.inner();
696        let chain_ownership = this
697            .execution_state_sender
698            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
699            .recv_response()?;
700        this.resource_controller
701            .track_runtime_chain_ownership(&chain_ownership)?;
702        Ok(chain_ownership)
703    }
704
705    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
706        let mut this = self.inner();
707        let id = this.current_application().id;
708        this.resource_controller.track_read_operation()?;
709        let receiver = this
710            .execution_state_sender
711            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
712        let state = this.view_user_states.entry(id).or_default();
713        state.contains_key_queries.register(receiver)
714    }
715
716    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
717        let mut this = self.inner();
718        let id = this.current_application().id;
719        let state = this.view_user_states.entry(id).or_default();
720        let value = state.contains_key_queries.wait(*promise)?;
721        Ok(value)
722    }
723
724    fn contains_keys_new(
725        &mut self,
726        keys: Vec<Vec<u8>>,
727    ) -> Result<Self::ContainsKeys, ExecutionError> {
728        let mut this = self.inner();
729        let id = this.current_application().id;
730        this.resource_controller.track_read_operation()?;
731        let receiver = this
732            .execution_state_sender
733            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
734        let state = this.view_user_states.entry(id).or_default();
735        state.contains_keys_queries.register(receiver)
736    }
737
738    fn contains_keys_wait(
739        &mut self,
740        promise: &Self::ContainsKeys,
741    ) -> Result<Vec<bool>, ExecutionError> {
742        let mut this = self.inner();
743        let id = this.current_application().id;
744        let state = this.view_user_states.entry(id).or_default();
745        let value = state.contains_keys_queries.wait(*promise)?;
746        Ok(value)
747    }
748
749    fn read_multi_values_bytes_new(
750        &mut self,
751        keys: Vec<Vec<u8>>,
752    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
753        let mut this = self.inner();
754        let id = this.current_application().id;
755        this.resource_controller.track_read_operation()?;
756        let receiver = this.execution_state_sender.send_request(move |callback| {
757            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
758        })?;
759        let state = this.view_user_states.entry(id).or_default();
760        state.read_multi_values_queries.register(receiver)
761    }
762
763    fn read_multi_values_bytes_wait(
764        &mut self,
765        promise: &Self::ReadMultiValuesBytes,
766    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
767        let mut this = self.inner();
768        let id = this.current_application().id;
769        let state = this.view_user_states.entry(id).or_default();
770        let values = state.read_multi_values_queries.wait(*promise)?;
771        for value in &values {
772            if let Some(value) = &value {
773                this.resource_controller
774                    .track_bytes_read(value.len() as u64)?;
775            }
776        }
777        Ok(values)
778    }
779
780    fn read_value_bytes_new(
781        &mut self,
782        key: Vec<u8>,
783    ) -> Result<Self::ReadValueBytes, ExecutionError> {
784        let mut this = self.inner();
785        let id = this.current_application().id;
786        this.resource_controller.track_read_operation()?;
787        let receiver = this
788            .execution_state_sender
789            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
790        let state = this.view_user_states.entry(id).or_default();
791        state.read_value_queries.register(receiver)
792    }
793
794    fn read_value_bytes_wait(
795        &mut self,
796        promise: &Self::ReadValueBytes,
797    ) -> Result<Option<Vec<u8>>, ExecutionError> {
798        let mut this = self.inner();
799        let id = this.current_application().id;
800        let value = {
801            let state = this.view_user_states.entry(id).or_default();
802            state.read_value_queries.wait(*promise)?
803        };
804        if let Some(value) = &value {
805            this.resource_controller
806                .track_bytes_read(value.len() as u64)?;
807        }
808        Ok(value)
809    }
810
811    fn find_keys_by_prefix_new(
812        &mut self,
813        key_prefix: Vec<u8>,
814    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
815        let mut this = self.inner();
816        let id = this.current_application().id;
817        this.resource_controller.track_read_operation()?;
818        let receiver = this.execution_state_sender.send_request(move |callback| {
819            ExecutionRequest::FindKeysByPrefix {
820                id,
821                key_prefix,
822                callback,
823            }
824        })?;
825        let state = this.view_user_states.entry(id).or_default();
826        state.find_keys_queries.register(receiver)
827    }
828
829    fn find_keys_by_prefix_wait(
830        &mut self,
831        promise: &Self::FindKeysByPrefix,
832    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
833        let mut this = self.inner();
834        let id = this.current_application().id;
835        let keys = {
836            let state = this.view_user_states.entry(id).or_default();
837            state.find_keys_queries.wait(*promise)?
838        };
839        let mut read_size = 0;
840        for key in &keys {
841            read_size += key.len();
842        }
843        this.resource_controller
844            .track_bytes_read(read_size as u64)?;
845        Ok(keys)
846    }
847
848    fn find_key_values_by_prefix_new(
849        &mut self,
850        key_prefix: Vec<u8>,
851    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
852        let mut this = self.inner();
853        let id = this.current_application().id;
854        this.resource_controller.track_read_operation()?;
855        let receiver = this.execution_state_sender.send_request(move |callback| {
856            ExecutionRequest::FindKeyValuesByPrefix {
857                id,
858                key_prefix,
859                callback,
860            }
861        })?;
862        let state = this.view_user_states.entry(id).or_default();
863        state.find_key_values_queries.register(receiver)
864    }
865
866    fn find_key_values_by_prefix_wait(
867        &mut self,
868        promise: &Self::FindKeyValuesByPrefix,
869    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
870        let mut this = self.inner();
871        let id = this.current_application().id;
872        let state = this.view_user_states.entry(id).or_default();
873        let key_values = state.find_key_values_queries.wait(*promise)?;
874        let mut read_size = 0;
875        for (key, value) in &key_values {
876            read_size += key.len() + value.len();
877        }
878        this.resource_controller
879            .track_bytes_read(read_size as u64)?;
880        Ok(key_values)
881    }
882
883    fn perform_http_request(
884        &mut self,
885        request: http::Request,
886    ) -> Result<http::Response, ExecutionError> {
887        let mut this = self.inner();
888        let app_permissions = this
889            .execution_state_sender
890            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
891            .recv_response()?;
892
893        let app_id = this.current_application().id;
894        ensure!(
895            app_permissions.can_make_http_requests(&app_id),
896            ExecutionError::UnauthorizedApplication(app_id)
897        );
898
899        this.resource_controller.track_http_request()?;
900
901        let response =
902            if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
903                match response {
904                    OracleResponse::Http(response) => response,
905                    _ => return Err(ExecutionError::OracleResponseMismatch),
906                }
907            } else {
908                this.execution_state_sender
909                    .send_request(|callback| ExecutionRequest::PerformHttpRequest {
910                        request,
911                        http_responses_are_oracle_responses:
912                            Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
913                        callback,
914                    })?
915                    .recv_response()?
916            };
917        this.transaction_tracker
918            .add_oracle_response(OracleResponse::Http(response.clone()));
919        Ok(response)
920    }
921
922    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
923        let mut this = self.inner();
924        if !this
925            .transaction_tracker
926            .replay_oracle_response(OracleResponse::Assert)?
927        {
928            // There are no recorded oracle responses, so we check the local time.
929            let local_time = this.transaction_tracker.local_time();
930            ensure!(
931                local_time < timestamp,
932                ExecutionError::AssertBefore {
933                    timestamp,
934                    local_time,
935                }
936            );
937        }
938        Ok(())
939    }
940
941    fn read_data_blob(&mut self, hash: &CryptoHash) -> Result<Vec<u8>, ExecutionError> {
942        let mut this = self.inner();
943        let blob_id = BlobId::new(*hash, BlobType::Data);
944        let (blob_content, is_new) = this
945            .execution_state_sender
946            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
947            .recv_response()?;
948        if is_new {
949            this.transaction_tracker
950                .replay_oracle_response(OracleResponse::Blob(blob_id))?;
951        }
952        Ok(blob_content.into_bytes().into_vec())
953    }
954
955    fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError> {
956        let mut this = self.inner();
957        let blob_id = BlobId::new(*hash, BlobType::Data);
958        let is_new = this
959            .execution_state_sender
960            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
961            .recv_response()?;
962        if is_new {
963            this.transaction_tracker
964                .replay_oracle_response(OracleResponse::Blob(blob_id))?;
965        }
966        Ok(())
967    }
968}
969
970/// An extension trait to determine in compile time the different behaviors between contract and
971/// services in the implementation of [`BaseRuntime`].
972trait ContractOrServiceRuntime {
973    /// Configured to `true` if the HTTP response size should be limited to the oracle response
974    /// size.
975    ///
976    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
977    /// and only storing in the block a shorter oracle response.
978    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
979}
980
981impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
982    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
983}
984
985impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
986    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
987}
988
989impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
990    fn clone(&self) -> Self {
991        SyncRuntimeHandle(self.0.clone())
992    }
993}
994
995impl ContractSyncRuntime {
996    pub(crate) fn new(
997        execution_state_sender: ExecutionStateSender,
998        chain_id: ChainId,
999        refund_grant_to: Option<Account>,
1000        resource_controller: ResourceController,
1001        action: &UserAction,
1002        txn_tracker: TransactionTracker,
1003    ) -> Self {
1004        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1005            SyncRuntimeInternal::new(
1006                chain_id,
1007                action.height(),
1008                action.round(),
1009                action.signer(),
1010                if let UserAction::Message(context, _) = action {
1011                    Some(context.into())
1012                } else {
1013                    None
1014                },
1015                execution_state_sender,
1016                None,
1017                refund_grant_to,
1018                resource_controller,
1019                txn_tracker,
1020                action.timestamp(),
1021            ),
1022        )))
1023    }
1024
1025    pub(crate) fn preload_contract(
1026        &self,
1027        id: ApplicationId,
1028        code: UserContractCode,
1029        description: ApplicationDescription,
1030    ) -> Result<(), ExecutionError> {
1031        let this = self
1032            .0
1033            .as_ref()
1034            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1035        let runtime_handle = this.clone();
1036        let mut this_guard = this.inner();
1037
1038        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1039            entry.insert(LoadedApplication::new(
1040                code.instantiate(runtime_handle)?,
1041                description,
1042            ));
1043            this_guard.applications_to_finalize.push(id);
1044        }
1045
1046        Ok(())
1047    }
1048
1049    /// Main entry point to start executing a user action.
1050    pub(crate) fn run_action(
1051        mut self,
1052        application_id: ApplicationId,
1053        chain_id: ChainId,
1054        action: UserAction,
1055    ) -> Result<(Option<Vec<u8>>, ResourceController, TransactionTracker), ExecutionError> {
1056        let result = self
1057            .deref_mut()
1058            .run_action(application_id, chain_id, action)?;
1059        let runtime = self
1060            .into_inner()
1061            .expect("Runtime clones should have been freed by now");
1062        Ok((
1063            result,
1064            runtime.resource_controller,
1065            runtime.transaction_tracker,
1066        ))
1067    }
1068}
1069
1070impl ContractSyncRuntimeHandle {
1071    fn run_action(
1072        &mut self,
1073        application_id: ApplicationId,
1074        chain_id: ChainId,
1075        action: UserAction,
1076    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1077        let finalize_context = FinalizeContext {
1078            authenticated_signer: action.signer(),
1079            chain_id,
1080            height: action.height(),
1081            round: action.round(),
1082        };
1083
1084        {
1085            let runtime = self.inner();
1086            assert_eq!(runtime.authenticated_signer, action.signer());
1087            assert_eq!(runtime.chain_id, chain_id);
1088            assert_eq!(runtime.height, action.height());
1089        }
1090
1091        let signer = action.signer();
1092        let closure = move |code: &mut UserContractInstance| match action {
1093            UserAction::Instantiate(_context, argument) => {
1094                code.instantiate(argument).map(|()| None)
1095            }
1096            UserAction::Operation(_context, operation) => {
1097                code.execute_operation(operation).map(Option::Some)
1098            }
1099            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1100            UserAction::ProcessStreams(_context, updates) => {
1101                code.process_streams(updates).map(|()| None)
1102            }
1103        };
1104
1105        let result = self.execute(application_id, signer, closure)?;
1106        self.finalize(finalize_context)?;
1107        Ok(result)
1108    }
1109
1110    /// Notifies all loaded applications that execution is finalizing.
1111    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1112        let applications = mem::take(&mut self.inner().applications_to_finalize)
1113            .into_iter()
1114            .rev();
1115
1116        self.inner().is_finalizing = true;
1117
1118        for application in applications {
1119            self.execute(application, context.authenticated_signer, |contract| {
1120                contract.finalize().map(|_| None)
1121            })?;
1122            self.inner().loaded_applications.remove(&application);
1123        }
1124
1125        Ok(())
1126    }
1127
1128    /// Executes a `closure` with the contract code for the `application_id`.
1129    fn execute(
1130        &mut self,
1131        application_id: ApplicationId,
1132        signer: Option<AccountOwner>,
1133        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1134    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1135        let contract = {
1136            let mut runtime = self.inner();
1137            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1138
1139            let status = ApplicationStatus {
1140                caller_id: None,
1141                id: application_id,
1142                description: application.description.clone(),
1143                signer,
1144            };
1145
1146            runtime.push_application(status);
1147
1148            application
1149        };
1150
1151        let result = closure(
1152            &mut contract
1153                .instance
1154                .try_lock()
1155                .expect("Application should not be already executing"),
1156        )?;
1157
1158        let mut runtime = self.inner();
1159        let application_status = runtime.pop_application();
1160        assert_eq!(application_status.caller_id, None);
1161        assert_eq!(application_status.id, application_id);
1162        assert_eq!(application_status.description, contract.description);
1163        assert_eq!(application_status.signer, signer);
1164        assert!(runtime.call_stack.is_empty());
1165
1166        Ok(result)
1167    }
1168}
1169
1170impl ContractRuntime for ContractSyncRuntimeHandle {
1171    fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1172        Ok(self.inner().authenticated_signer)
1173    }
1174
1175    fn message_id(&mut self) -> Result<Option<MessageId>, ExecutionError> {
1176        Ok(self.inner().executing_message.map(|metadata| metadata.id))
1177    }
1178
1179    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1180        Ok(self
1181            .inner()
1182            .executing_message
1183            .map(|metadata| metadata.is_bouncing))
1184    }
1185
1186    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1187        let this = self.inner();
1188        if this.call_stack.len() <= 1 {
1189            return Ok(None);
1190        }
1191        Ok(this.current_application().caller_id)
1192    }
1193
1194    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1195        Ok(match vm_runtime {
1196            VmRuntime::Wasm => {
1197                self.inner()
1198                    .resource_controller
1199                    .policy()
1200                    .maximum_wasm_fuel_per_block
1201            }
1202            VmRuntime::Evm => {
1203                self.inner()
1204                    .resource_controller
1205                    .policy()
1206                    .maximum_evm_fuel_per_block
1207            }
1208        })
1209    }
1210
1211    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1212        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1213    }
1214
1215    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1216        let mut this = self.inner();
1217        this.resource_controller.track_fuel(fuel, vm_runtime)
1218    }
1219
1220    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1221        let mut this = self.inner();
1222        let application = this.current_application();
1223        let application_id = application.id;
1224        let authenticated_signer = application.signer;
1225        let mut refund_grant_to = this.refund_grant_to;
1226
1227        let grant = this
1228            .resource_controller
1229            .policy()
1230            .total_price(&message.grant)?;
1231        if grant.is_zero() {
1232            refund_grant_to = None;
1233        } else {
1234            this.resource_controller.track_grant(grant)?;
1235        }
1236        let kind = if message.is_tracked {
1237            MessageKind::Tracked
1238        } else {
1239            MessageKind::Simple
1240        };
1241
1242        this.transaction_tracker
1243            .add_outgoing_message(OutgoingMessage {
1244                destination: message.destination,
1245                authenticated_signer,
1246                refund_grant_to,
1247                grant,
1248                kind,
1249                message: Message::User {
1250                    application_id,
1251                    bytes: message.message,
1252                },
1253            })?;
1254
1255        Ok(())
1256    }
1257
1258    fn transfer(
1259        &mut self,
1260        source: AccountOwner,
1261        destination: Account,
1262        amount: Amount,
1263    ) -> Result<(), ExecutionError> {
1264        let mut this = self.inner();
1265        let current_application = this.current_application();
1266        let application_id = current_application.id;
1267        let signer = current_application.signer;
1268
1269        let maybe_message = this
1270            .execution_state_sender
1271            .send_request(|callback| ExecutionRequest::Transfer {
1272                source,
1273                destination,
1274                amount,
1275                signer,
1276                application_id,
1277                callback,
1278            })?
1279            .recv_response()?;
1280
1281        this.transaction_tracker
1282            .add_outgoing_messages(maybe_message)?;
1283        Ok(())
1284    }
1285
1286    fn claim(
1287        &mut self,
1288        source: Account,
1289        destination: Account,
1290        amount: Amount,
1291    ) -> Result<(), ExecutionError> {
1292        let mut this = self.inner();
1293        let current_application = this.current_application();
1294        let application_id = current_application.id;
1295        let signer = current_application.signer;
1296
1297        let message = this
1298            .execution_state_sender
1299            .send_request(|callback| ExecutionRequest::Claim {
1300                source,
1301                destination,
1302                amount,
1303                signer,
1304                application_id,
1305                callback,
1306            })?
1307            .recv_response()?;
1308        this.transaction_tracker.add_outgoing_message(message)?;
1309        Ok(())
1310    }
1311
1312    fn try_call_application(
1313        &mut self,
1314        authenticated: bool,
1315        callee_id: ApplicationId,
1316        argument: Vec<u8>,
1317    ) -> Result<Vec<u8>, ExecutionError> {
1318        let contract = self
1319            .inner()
1320            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1321
1322        let value = contract
1323            .try_lock()
1324            .expect("Applications should not have reentrant calls")
1325            .execute_operation(argument)?;
1326
1327        self.inner().finish_call()?;
1328
1329        Ok(value)
1330    }
1331
1332    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1333        let mut this = self.inner();
1334        ensure!(
1335            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1336            ExecutionError::StreamNameTooLong
1337        );
1338        let application_id = GenericApplicationId::User(this.current_application().id);
1339        let stream_id = StreamId {
1340            stream_name,
1341            application_id,
1342        };
1343        let index = this
1344            .execution_state_sender
1345            .send_request(|callback| ExecutionRequest::NextEventIndex {
1346                stream_id: stream_id.clone(),
1347                callback,
1348            })?
1349            .recv_response()?;
1350        // TODO(#365): Consider separate event fee categories.
1351        this.resource_controller
1352            .track_bytes_written(value.len() as u64)?;
1353        this.transaction_tracker.add_event(stream_id, index, value);
1354        Ok(index)
1355    }
1356
1357    fn read_event(
1358        &mut self,
1359        chain_id: ChainId,
1360        stream_name: StreamName,
1361        index: u32,
1362    ) -> Result<Vec<u8>, ExecutionError> {
1363        let mut this = self.inner();
1364        ensure!(
1365            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1366            ExecutionError::StreamNameTooLong
1367        );
1368        let application_id = GenericApplicationId::User(this.current_application().id);
1369        let stream_id = StreamId {
1370            stream_name,
1371            application_id,
1372        };
1373        let event_id = EventId {
1374            stream_id,
1375            index,
1376            chain_id,
1377        };
1378        let event = this
1379            .execution_state_sender
1380            .send_request(|callback| ExecutionRequest::ReadEvent {
1381                event_id: event_id.clone(),
1382                callback,
1383            })?
1384            .recv_response()?;
1385        // TODO(#365): Consider separate event fee categories.
1386        this.resource_controller
1387            .track_bytes_read(event.len() as u64)?;
1388        this.transaction_tracker
1389            .replay_oracle_response(OracleResponse::Event(event_id, event.clone()))?;
1390        Ok(event)
1391    }
1392
1393    fn subscribe_to_events(
1394        &mut self,
1395        chain_id: ChainId,
1396        application_id: ApplicationId,
1397        stream_name: StreamName,
1398    ) -> Result<(), ExecutionError> {
1399        let mut this = self.inner();
1400        ensure!(
1401            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1402            ExecutionError::StreamNameTooLong
1403        );
1404        let stream_id = StreamId {
1405            stream_name,
1406            application_id: application_id.into(),
1407        };
1408        let subscriber_app_id = this.current_application().id;
1409        let next_index = this
1410            .execution_state_sender
1411            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1412                chain_id,
1413                stream_id: stream_id.clone(),
1414                subscriber_app_id,
1415                callback,
1416            })?
1417            .recv_response()?;
1418        this.transaction_tracker.add_stream_to_process(
1419            subscriber_app_id,
1420            chain_id,
1421            stream_id,
1422            0,
1423            next_index,
1424        );
1425        Ok(())
1426    }
1427
1428    fn unsubscribe_from_events(
1429        &mut self,
1430        chain_id: ChainId,
1431        application_id: ApplicationId,
1432        stream_name: StreamName,
1433    ) -> Result<(), ExecutionError> {
1434        let mut this = self.inner();
1435        ensure!(
1436            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1437            ExecutionError::StreamNameTooLong
1438        );
1439        let stream_id = StreamId {
1440            stream_name,
1441            application_id: application_id.into(),
1442        };
1443        let subscriber_app_id = this.current_application().id;
1444        this.execution_state_sender
1445            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1446                chain_id,
1447                stream_id: stream_id.clone(),
1448                subscriber_app_id,
1449                callback,
1450            })?
1451            .recv_response()?;
1452        this.transaction_tracker
1453            .remove_stream_to_process(application_id, chain_id, stream_id);
1454        Ok(())
1455    }
1456
1457    fn query_service(
1458        &mut self,
1459        application_id: ApplicationId,
1460        query: Vec<u8>,
1461    ) -> Result<Vec<u8>, ExecutionError> {
1462        let mut this = self.inner();
1463
1464        let app_permissions = this
1465            .execution_state_sender
1466            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1467            .recv_response()?;
1468
1469        let app_id = this.current_application().id;
1470        ensure!(
1471            app_permissions.can_call_services(&app_id),
1472            ExecutionError::UnauthorizedApplication(app_id)
1473        );
1474
1475        this.resource_controller.track_service_oracle_call()?;
1476        let response =
1477            if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1478                match response {
1479                    OracleResponse::Service(bytes) => bytes,
1480                    _ => return Err(ExecutionError::OracleResponseMismatch),
1481                }
1482            } else {
1483                this.run_service_oracle_query(application_id, query)?
1484            };
1485
1486        this.transaction_tracker
1487            .add_oracle_response(OracleResponse::Service(response.clone()));
1488
1489        Ok(response)
1490    }
1491
1492    fn open_chain(
1493        &mut self,
1494        ownership: ChainOwnership,
1495        application_permissions: ApplicationPermissions,
1496        balance: Amount,
1497    ) -> Result<ChainId, ExecutionError> {
1498        let parent_id = self.inner().chain_id;
1499        let block_height = self.block_height()?;
1500
1501        let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1502        let timestamp = self.inner().user_context;
1503
1504        let (chain_id, txn_tracker_moved) = self
1505            .inner()
1506            .execution_state_sender
1507            .send_request(|callback| ExecutionRequest::OpenChain {
1508                ownership,
1509                balance,
1510                parent_id,
1511                block_height,
1512                timestamp,
1513                application_permissions,
1514                callback,
1515                txn_tracker: txn_tracker_moved,
1516            })?
1517            .recv_response()?;
1518
1519        self.inner().transaction_tracker = txn_tracker_moved;
1520
1521        Ok(chain_id)
1522    }
1523
1524    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1525        let this = self.inner();
1526        let application_id = this.current_application().id;
1527        this.execution_state_sender
1528            .send_request(|callback| ExecutionRequest::CloseChain {
1529                application_id,
1530                callback,
1531            })?
1532            .recv_response()?
1533    }
1534
1535    fn change_application_permissions(
1536        &mut self,
1537        application_permissions: ApplicationPermissions,
1538    ) -> Result<(), ExecutionError> {
1539        let this = self.inner();
1540        let application_id = this.current_application().id;
1541        this.execution_state_sender
1542            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1543                application_id,
1544                application_permissions,
1545                callback,
1546            })?
1547            .recv_response()?
1548    }
1549
1550    fn create_application(
1551        &mut self,
1552        module_id: ModuleId,
1553        parameters: Vec<u8>,
1554        argument: Vec<u8>,
1555        required_application_ids: Vec<ApplicationId>,
1556    ) -> Result<ApplicationId, ExecutionError> {
1557        let chain_id = self.inner().chain_id;
1558        let block_height = self.block_height()?;
1559
1560        let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1561
1562        let CreateApplicationResult {
1563            app_id,
1564            txn_tracker: txn_tracker_moved,
1565        } = self
1566            .inner()
1567            .execution_state_sender
1568            .send_request(move |callback| ExecutionRequest::CreateApplication {
1569                chain_id,
1570                block_height,
1571                module_id,
1572                parameters,
1573                required_application_ids,
1574                callback,
1575                txn_tracker: txn_tracker_moved,
1576            })?
1577            .recv_response()??;
1578
1579        self.inner().transaction_tracker = txn_tracker_moved;
1580
1581        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1582
1583        contract
1584            .try_lock()
1585            .expect("Applications should not have reentrant calls")
1586            .instantiate(argument)?;
1587
1588        self.inner().finish_call()?;
1589
1590        Ok(app_id)
1591    }
1592
1593    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<BlobId, ExecutionError> {
1594        let blob = Blob::new_data(bytes);
1595        let blob_id = blob.id();
1596        self.inner().transaction_tracker.add_created_blob(blob);
1597        Ok(blob_id)
1598    }
1599
1600    fn publish_module(
1601        &mut self,
1602        contract: Bytecode,
1603        service: Bytecode,
1604        vm_runtime: VmRuntime,
1605    ) -> Result<ModuleId, ExecutionError> {
1606        let (blobs, module_id) =
1607            crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1608        for blob in blobs {
1609            self.inner().transaction_tracker.add_created_blob(blob);
1610        }
1611        Ok(module_id)
1612    }
1613
1614    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1615        let mut this = self.inner();
1616        let round =
1617            if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1618                match response {
1619                    OracleResponse::Round(round) => round,
1620                    _ => return Err(ExecutionError::OracleResponseMismatch),
1621                }
1622            } else {
1623                this.round
1624            };
1625        this.transaction_tracker
1626            .add_oracle_response(OracleResponse::Round(round));
1627        Ok(round)
1628    }
1629
1630    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1631        let mut this = self.inner();
1632        let id = this.current_application().id;
1633        let state = this.view_user_states.entry(id).or_default();
1634        state.force_all_pending_queries()?;
1635        this.resource_controller.track_write_operations(
1636            batch
1637                .num_operations()
1638                .try_into()
1639                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1640        )?;
1641        this.resource_controller
1642            .track_bytes_written(batch.size() as u64)?;
1643        this.execution_state_sender
1644            .send_request(|callback| ExecutionRequest::WriteBatch {
1645                id,
1646                batch,
1647                callback,
1648            })?
1649            .recv_response()?;
1650        Ok(())
1651    }
1652}
1653
1654impl ServiceSyncRuntime {
1655    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1656    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1657        let mut txn_tracker = TransactionTracker::default();
1658        txn_tracker.set_local_time(context.local_time);
1659        Self::new_with_txn_tracker(execution_state_sender, context, None, txn_tracker)
1660    }
1661
1662    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1663    pub fn new_with_txn_tracker(
1664        execution_state_sender: ExecutionStateSender,
1665        context: QueryContext,
1666        deadline: Option<Instant>,
1667        txn_tracker: TransactionTracker,
1668    ) -> Self {
1669        let runtime = SyncRuntime(Some(
1670            SyncRuntimeInternal::new(
1671                context.chain_id,
1672                context.next_block_height,
1673                None,
1674                None,
1675                None,
1676                execution_state_sender,
1677                deadline,
1678                None,
1679                ResourceController::default(),
1680                txn_tracker,
1681                (),
1682            )
1683            .into(),
1684        ));
1685
1686        ServiceSyncRuntime {
1687            runtime,
1688            current_context: context,
1689        }
1690    }
1691
1692    /// Loads a service into the runtime's memory.
1693    pub(crate) fn preload_service(
1694        &self,
1695        id: ApplicationId,
1696        code: UserServiceCode,
1697        description: ApplicationDescription,
1698    ) -> Result<(), ExecutionError> {
1699        let this = self
1700            .runtime
1701            .0
1702            .as_ref()
1703            .expect("services shouldn't be preloaded while the runtime is being dropped");
1704        let runtime_handle = this.clone();
1705        let mut this_guard = this.inner();
1706
1707        if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1708            entry.insert(LoadedApplication::new(
1709                code.instantiate(runtime_handle)?,
1710                description,
1711            ));
1712            this_guard.applications_to_finalize.push(id);
1713        }
1714
1715        Ok(())
1716    }
1717
1718    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1719    pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1720        while let Ok(request) = incoming_requests.recv() {
1721            let ServiceRuntimeRequest::Query {
1722                application_id,
1723                context,
1724                query,
1725                callback,
1726            } = request;
1727
1728            self.prepare_for_query(context);
1729
1730            let _ = callback.send(self.run_query(application_id, query));
1731        }
1732    }
1733
1734    /// Prepares the runtime to query an application.
1735    pub(crate) fn prepare_for_query(&mut self, new_context: QueryContext) {
1736        let expected_context = QueryContext {
1737            local_time: new_context.local_time,
1738            ..self.current_context
1739        };
1740
1741        if new_context != expected_context {
1742            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1743            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1744        } else {
1745            self.handle_mut()
1746                .inner()
1747                .transaction_tracker
1748                .set_local_time(new_context.local_time);
1749        }
1750    }
1751
1752    /// Queries an application specified by its [`ApplicationId`].
1753    pub(crate) fn run_query(
1754        &mut self,
1755        application_id: ApplicationId,
1756        query: Vec<u8>,
1757    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1758        let this = self.handle_mut();
1759        let response = this.try_query_application(application_id, query)?;
1760        let operations = mem::take(&mut this.inner().scheduled_operations);
1761
1762        Ok(QueryOutcome {
1763            response,
1764            operations,
1765        })
1766    }
1767
1768    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1769    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1770        self.runtime.0.as_mut().expect(
1771            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1772        )
1773    }
1774}
1775
1776impl ServiceRuntime for ServiceSyncRuntimeHandle {
1777    /// Note that queries are not available from writable contexts.
1778    fn try_query_application(
1779        &mut self,
1780        queried_id: ApplicationId,
1781        argument: Vec<u8>,
1782    ) -> Result<Vec<u8>, ExecutionError> {
1783        let service = {
1784            let mut this = self.inner();
1785
1786            // Load the application.
1787            let application = this.load_service_instance(self.clone(), queried_id)?;
1788            // Make the call to user code.
1789            this.push_application(ApplicationStatus {
1790                caller_id: None,
1791                id: queried_id,
1792                description: application.description,
1793                signer: None,
1794            });
1795            application.instance
1796        };
1797        let response = service
1798            .try_lock()
1799            .expect("Applications should not have reentrant calls")
1800            .handle_query(argument)?;
1801        self.inner().pop_application();
1802        Ok(response)
1803    }
1804
1805    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1806        let mut this = self.inner();
1807        let application_id = this.current_application().id;
1808
1809        this.scheduled_operations.push(Operation::User {
1810            application_id,
1811            bytes: operation,
1812        });
1813
1814        Ok(())
1815    }
1816
1817    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1818        if let Some(deadline) = self.inner().deadline {
1819            if Instant::now() >= deadline {
1820                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1821            }
1822        }
1823        Ok(())
1824    }
1825}
1826
1827/// A request to the service runtime actor.
1828pub enum ServiceRuntimeRequest {
1829    Query {
1830        application_id: ApplicationId,
1831        context: QueryContext,
1832        query: Vec<u8>,
1833        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1834    },
1835}
1836
1837/// The origin of the execution.
1838#[derive(Clone, Copy, Debug)]
1839struct ExecutingMessage {
1840    id: MessageId,
1841    is_bouncing: bool,
1842}
1843
1844impl From<&MessageContext> for ExecutingMessage {
1845    fn from(context: &MessageContext) -> Self {
1846        ExecutingMessage {
1847            id: context.message_id,
1848            is_bouncing: context.is_bouncing,
1849        }
1850    }
1851}
1852
1853/// Creates a compressed contract and service bytecode synchronously.
1854pub fn create_bytecode_blobs_sync(
1855    contract: Bytecode,
1856    service: Bytecode,
1857    vm_runtime: VmRuntime,
1858) -> (Vec<Blob>, ModuleId) {
1859    match vm_runtime {
1860        VmRuntime::Wasm => {
1861            let compressed_contract = contract.compress();
1862            let compressed_service = service.compress();
1863            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1864            let service_blob = Blob::new_service_bytecode(compressed_service);
1865            let module_id =
1866                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1867            (vec![contract_blob, service_blob], module_id)
1868        }
1869        VmRuntime::Evm => {
1870            let compressed_contract = contract.compress();
1871            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1872            let module_id = ModuleId::new(
1873                evm_contract_blob.id().hash,
1874                evm_contract_blob.id().hash,
1875                vm_runtime,
1876            );
1877            (vec![evm_contract_blob], module_id)
1878        }
1879    }
1880}