linera_execution/
runtime.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{hash_map, BTreeMap, HashMap, HashSet},
6    mem,
7    ops::{Deref, DerefMut},
8    sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13    data_types::{
14        Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15        SendMessageRequest, Timestamp,
16    },
17    ensure, http,
18    identifiers::{
19        Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20    },
21    ownership::ChainOwnership,
22    time::Instant,
23    vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27use tracing::instrument;
28
29use crate::{
30    execution::UserAction,
31    execution_state_actor::{ExecutionRequest, ExecutionStateSender},
32    resources::ResourceController,
33    system::CreateApplicationResult,
34    util::{ReceiverExt, UnboundedSenderExt},
35    ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
36    ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
37    OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
38    UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
39};
40
41#[cfg(test)]
42#[path = "unit_tests/runtime_tests.rs"]
43mod tests;
44
45pub trait WithContext {
46    type UserContext;
47    type Code;
48}
49
50impl WithContext for UserContractInstance {
51    type UserContext = Timestamp;
52    type Code = UserContractCode;
53}
54
55impl WithContext for UserServiceInstance {
56    type UserContext = ();
57    type Code = UserServiceCode;
58}
59
60#[cfg(test)]
61impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
62    type UserContext = ();
63    type Code = ();
64}
65
66#[derive(Debug)]
67pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
68
69pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
70
71pub struct ServiceSyncRuntime {
72    runtime: SyncRuntime<UserServiceInstance>,
73    current_context: QueryContext,
74}
75
76#[derive(Debug)]
77pub struct SyncRuntimeHandle<UserInstance: WithContext>(
78    Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
79);
80
81pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
82pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
83
84/// Runtime data tracked during the execution of a transaction on the synchronous thread.
85#[derive(Debug)]
86pub struct SyncRuntimeInternal<UserInstance: WithContext> {
87    /// The current chain ID.
88    chain_id: ChainId,
89    /// The height of the next block that will be added to this chain. During operations
90    /// and messages, this is the current block height.
91    height: BlockHeight,
92    /// The current consensus round. Only available during block validation in multi-leader rounds.
93    round: Option<u32>,
94    /// The current message being executed, if there is one.
95    #[debug(skip_if = Option::is_none)]
96    executing_message: Option<ExecutingMessage>,
97
98    /// How to interact with the storage view of the execution state.
99    execution_state_sender: ExecutionStateSender,
100
101    /// If applications are being finalized.
102    ///
103    /// If [`true`], disables cross-application calls.
104    is_finalizing: bool,
105    /// Applications that need to be finalized.
106    applications_to_finalize: Vec<ApplicationId>,
107
108    /// Application instances loaded in this transaction.
109    preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
110    /// Application instances loaded in this transaction.
111    loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
112    /// The current stack of application descriptions.
113    call_stack: Vec<ApplicationStatus>,
114    /// The set of the IDs of the applications that are in the `call_stack`.
115    active_applications: HashSet<ApplicationId>,
116    /// The operations scheduled during this query.
117    scheduled_operations: Vec<Operation>,
118
119    /// Track application states based on views.
120    view_user_states: BTreeMap<ApplicationId, ViewUserState>,
121
122    /// The deadline this runtime should finish executing.
123    ///
124    /// Used to limit the execution time of services running as oracles.
125    deadline: Option<Instant>,
126
127    /// Where to send a refund for the unused part of the grant after execution, if any.
128    #[debug(skip_if = Option::is_none)]
129    refund_grant_to: Option<Account>,
130    /// Controller to track fuel and storage consumption.
131    resource_controller: ResourceController,
132    /// Additional context for the runtime.
133    user_context: UserInstance::UserContext,
134    /// Whether contract log messages should be output.
135    allow_application_logs: bool,
136}
137
138/// The runtime status of an application.
139#[derive(Debug)]
140struct ApplicationStatus {
141    /// The caller application ID, if forwarded during the call.
142    caller_id: Option<ApplicationId>,
143    /// The application ID.
144    id: ApplicationId,
145    /// The application description.
146    description: ApplicationDescription,
147    /// The authenticated owner for the execution thread, if any.
148    signer: Option<AccountOwner>,
149}
150
151/// A loaded application instance.
152#[derive(Debug)]
153struct LoadedApplication<Instance> {
154    instance: Arc<Mutex<Instance>>,
155    description: ApplicationDescription,
156}
157
158impl<Instance> LoadedApplication<Instance> {
159    /// Creates a new [`LoadedApplication`] entry from the `instance` and its `description`.
160    fn new(instance: Instance, description: ApplicationDescription) -> Self {
161        LoadedApplication {
162            instance: Arc::new(Mutex::new(instance)),
163            description,
164        }
165    }
166}
167
168impl<Instance> Clone for LoadedApplication<Instance> {
169    // Manual implementation is needed to prevent the derive macro from adding an `Instance: Clone`
170    // bound
171    fn clone(&self) -> Self {
172        LoadedApplication {
173            instance: self.instance.clone(),
174            description: self.description.clone(),
175        }
176    }
177}
178
179#[derive(Debug)]
180enum Promise<T> {
181    Ready(T),
182    Pending(Receiver<T>),
183}
184
185impl<T> Promise<T> {
186    fn force(&mut self) -> Result<(), ExecutionError> {
187        if let Promise::Pending(receiver) = self {
188            let value = receiver
189                .recv_ref()
190                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
191            *self = Promise::Ready(value);
192        }
193        Ok(())
194    }
195
196    fn read(self) -> Result<T, ExecutionError> {
197        match self {
198            Promise::Pending(receiver) => {
199                let value = receiver.recv_response()?;
200                Ok(value)
201            }
202            Promise::Ready(value) => Ok(value),
203        }
204    }
205}
206
207/// Manages a set of pending queries returning values of type `T`.
208#[derive(Debug, Default)]
209struct QueryManager<T> {
210    /// The queries in progress.
211    pending_queries: BTreeMap<u32, Promise<T>>,
212    /// The number of queries ever registered so far. Used for the index of the next query.
213    query_count: u32,
214    /// The number of active queries.
215    active_query_count: u32,
216}
217
218impl<T> QueryManager<T> {
219    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
220        let id = self.query_count;
221        self.pending_queries.insert(id, Promise::Pending(receiver));
222        self.query_count = self
223            .query_count
224            .checked_add(1)
225            .ok_or(ArithmeticError::Overflow)?;
226        self.active_query_count = self
227            .active_query_count
228            .checked_add(1)
229            .ok_or(ArithmeticError::Overflow)?;
230        Ok(id)
231    }
232
233    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
234        let promise = self
235            .pending_queries
236            .remove(&id)
237            .ok_or(ExecutionError::InvalidPromise)?;
238        let value = promise.read()?;
239        self.active_query_count -= 1;
240        Ok(value)
241    }
242
243    fn force_all(&mut self) -> Result<(), ExecutionError> {
244        for promise in self.pending_queries.values_mut() {
245            promise.force()?;
246        }
247        Ok(())
248    }
249}
250
251type Keys = Vec<Vec<u8>>;
252type Value = Vec<u8>;
253type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
254
255#[derive(Debug, Default)]
256struct ViewUserState {
257    /// The contains-key queries in progress.
258    contains_key_queries: QueryManager<bool>,
259    /// The contains-keys queries in progress.
260    contains_keys_queries: QueryManager<Vec<bool>>,
261    /// The read-value queries in progress.
262    read_value_queries: QueryManager<Option<Value>>,
263    /// The read-multi-values queries in progress.
264    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
265    /// The find-keys queries in progress.
266    find_keys_queries: QueryManager<Keys>,
267    /// The find-key-values queries in progress.
268    find_key_values_queries: QueryManager<KeyValues>,
269}
270
271impl ViewUserState {
272    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
273        self.contains_key_queries.force_all()?;
274        self.contains_keys_queries.force_all()?;
275        self.read_value_queries.force_all()?;
276        self.read_multi_values_queries.force_all()?;
277        self.find_keys_queries.force_all()?;
278        self.find_key_values_queries.force_all()?;
279        Ok(())
280    }
281}
282
283impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
284    type Target = SyncRuntimeHandle<UserInstance>;
285
286    fn deref(&self) -> &Self::Target {
287        self.0.as_ref().expect(
288            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
289        )
290    }
291}
292
293impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
294    fn deref_mut(&mut self) -> &mut Self::Target {
295        self.0.as_mut().expect(
296            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
297        )
298    }
299}
300
301impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
302    fn drop(&mut self) {
303        // Ensure the `loaded_applications` are cleared to prevent circular references in
304        // the runtime
305        if let Some(handle) = self.0.take() {
306            handle.inner().loaded_applications.clear();
307        }
308    }
309}
310
311impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
312    #[expect(clippy::too_many_arguments)]
313    fn new(
314        chain_id: ChainId,
315        height: BlockHeight,
316        round: Option<u32>,
317        executing_message: Option<ExecutingMessage>,
318        execution_state_sender: ExecutionStateSender,
319        deadline: Option<Instant>,
320        refund_grant_to: Option<Account>,
321        resource_controller: ResourceController,
322        user_context: UserInstance::UserContext,
323        allow_application_logs: bool,
324    ) -> Self {
325        Self {
326            chain_id,
327            height,
328            round,
329            executing_message,
330            execution_state_sender,
331            is_finalizing: false,
332            applications_to_finalize: Vec::new(),
333            preloaded_applications: HashMap::new(),
334            loaded_applications: HashMap::new(),
335            call_stack: Vec::new(),
336            active_applications: HashSet::new(),
337            view_user_states: BTreeMap::new(),
338            deadline,
339            refund_grant_to,
340            resource_controller,
341            scheduled_operations: Vec::new(),
342            user_context,
343            allow_application_logs,
344        }
345    }
346
347    /// Returns the [`ApplicationStatus`] of the current application.
348    ///
349    /// The current application is the last to be pushed to the `call_stack`.
350    ///
351    /// # Panics
352    ///
353    /// If the call stack is empty.
354    fn current_application(&self) -> &ApplicationStatus {
355        self.call_stack
356            .last()
357            .expect("Call stack is unexpectedly empty")
358    }
359
360    /// Inserts a new [`ApplicationStatus`] to the end of the `call_stack`.
361    ///
362    /// Ensures the application's ID is also tracked in the `active_applications` set.
363    fn push_application(&mut self, status: ApplicationStatus) {
364        self.active_applications.insert(status.id);
365        self.call_stack.push(status);
366    }
367
368    /// Removes the [`current_application`][`Self::current_application`] from the `call_stack`.
369    ///
370    /// Ensures the application's ID is also removed from the `active_applications` set.
371    ///
372    /// # Panics
373    ///
374    /// If the call stack is empty.
375    fn pop_application(&mut self) -> ApplicationStatus {
376        let status = self
377            .call_stack
378            .pop()
379            .expect("Can't remove application from empty call stack");
380        assert!(self.active_applications.remove(&status.id));
381        status
382    }
383
384    /// Ensures that a call to `application_id` is not-reentrant.
385    ///
386    /// Returns an error if there already is an entry for `application_id` in the call stack.
387    fn check_for_reentrancy(&self, application_id: ApplicationId) -> Result<(), ExecutionError> {
388        ensure!(
389            !self.active_applications.contains(&application_id),
390            ExecutionError::ReentrantCall(application_id)
391        );
392        Ok(())
393    }
394}
395
396impl SyncRuntimeInternal<UserContractInstance> {
397    /// Loads a contract instance, initializing it with this runtime if needed.
398    #[instrument(skip_all, fields(application_id = %id))]
399    fn load_contract_instance(
400        &mut self,
401        this: SyncRuntimeHandle<UserContractInstance>,
402        id: ApplicationId,
403    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
404        match self.loaded_applications.entry(id) {
405            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
406
407            hash_map::Entry::Vacant(entry) => {
408                // First time actually using the application. Let's see if the code was
409                // pre-loaded.
410                let (code, description) = match self.preloaded_applications.entry(id) {
411                    // TODO(#2927): support dynamic loading of modules on the Web
412                    #[cfg(web)]
413                    hash_map::Entry::Vacant(_) => {
414                        drop(this);
415                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
416                            id,
417                        )));
418                    }
419                    #[cfg(not(web))]
420                    hash_map::Entry::Vacant(entry) => {
421                        let (code, description) = self
422                            .execution_state_sender
423                            .send_request(move |callback| ExecutionRequest::LoadContract {
424                                id,
425                                callback,
426                            })?
427                            .recv_response()?;
428                        entry.insert((code, description)).clone()
429                    }
430                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
431                };
432                let instance = code.instantiate(this)?;
433
434                self.applications_to_finalize.push(id);
435                Ok(entry
436                    .insert(LoadedApplication::new(instance, description))
437                    .clone())
438            }
439        }
440    }
441
442    /// Configures the runtime for executing a call to a different contract.
443    fn prepare_for_call(
444        &mut self,
445        this: ContractSyncRuntimeHandle,
446        authenticated: bool,
447        callee_id: ApplicationId,
448    ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
449        self.check_for_reentrancy(callee_id)?;
450
451        ensure!(
452            !self.is_finalizing,
453            ExecutionError::CrossApplicationCallInFinalize {
454                caller_id: Box::new(self.current_application().id),
455                callee_id: Box::new(callee_id),
456            }
457        );
458
459        // Load the application.
460        let application = self.load_contract_instance(this, callee_id)?;
461
462        let caller = self.current_application();
463        let caller_id = caller.id;
464        let caller_signer = caller.signer;
465        // Make the call to user code.
466        let authenticated_owner = match caller_signer {
467            Some(signer) if authenticated => Some(signer),
468            _ => None,
469        };
470        let authenticated_caller_id = authenticated.then_some(caller_id);
471        self.push_application(ApplicationStatus {
472            caller_id: authenticated_caller_id,
473            id: callee_id,
474            description: application.description,
475            // Allow further nested calls to be authenticated if this one is.
476            signer: authenticated_owner,
477        });
478        Ok(application.instance)
479    }
480
481    /// Cleans up the runtime after the execution of a call to a different contract.
482    fn finish_call(&mut self) {
483        self.pop_application();
484    }
485
486    /// Runs the service in a separate thread as an oracle.
487    fn run_service_oracle_query(
488        &mut self,
489        application_id: ApplicationId,
490        query: Vec<u8>,
491    ) -> Result<Vec<u8>, ExecutionError> {
492        let timeout = self
493            .resource_controller
494            .remaining_service_oracle_execution_time()?;
495        let execution_start = Instant::now();
496        let deadline = Some(execution_start + timeout);
497        let response = self
498            .execution_state_sender
499            .send_request(|callback| ExecutionRequest::QueryServiceOracle {
500                deadline,
501                application_id,
502                next_block_height: self.height,
503                query,
504                callback,
505            })?
506            .recv_response()?;
507
508        self.resource_controller
509            .track_service_oracle_execution(execution_start.elapsed())?;
510        self.resource_controller
511            .track_service_oracle_response(response.len())?;
512
513        Ok(response)
514    }
515}
516
517impl SyncRuntimeInternal<UserServiceInstance> {
518    /// Initializes a service instance with this runtime.
519    fn load_service_instance(
520        &mut self,
521        this: SyncRuntimeHandle<UserServiceInstance>,
522        id: ApplicationId,
523    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
524        match self.loaded_applications.entry(id) {
525            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
526
527            hash_map::Entry::Vacant(entry) => {
528                // First time actually using the application. Let's see if the code was
529                // pre-loaded.
530                let (code, description) = match self.preloaded_applications.entry(id) {
531                    // TODO(#2927): support dynamic loading of modules on the Web
532                    #[cfg(web)]
533                    hash_map::Entry::Vacant(_) => {
534                        drop(this);
535                        return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
536                            id,
537                        )));
538                    }
539                    #[cfg(not(web))]
540                    hash_map::Entry::Vacant(entry) => {
541                        let (code, description) = self
542                            .execution_state_sender
543                            .send_request(move |callback| ExecutionRequest::LoadService {
544                                id,
545                                callback,
546                            })?
547                            .recv_response()?;
548                        entry.insert((code, description)).clone()
549                    }
550                    hash_map::Entry::Occupied(entry) => entry.get().clone(),
551                };
552                let instance = code.instantiate(this)?;
553
554                self.applications_to_finalize.push(id);
555                Ok(entry
556                    .insert(LoadedApplication::new(instance, description))
557                    .clone())
558            }
559        }
560    }
561}
562
563impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
564    fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
565        let handle = self.0.take().expect(
566            "`SyncRuntime` should not be used after its `inner` contents have been moved out",
567        );
568        let runtime = Arc::into_inner(handle.0)?
569            .into_inner()
570            .expect("`SyncRuntime` should run in a single thread which should not panic");
571        Some(runtime)
572    }
573}
574
575impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
576    for SyncRuntimeHandle<UserInstance>
577{
578    fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
579        SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
580    }
581}
582
583impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
584    fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
585        self.0
586            .try_lock()
587            .expect("Synchronous runtimes run on a single execution thread")
588    }
589}
590
591impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
592where
593    Self: ContractOrServiceRuntime,
594{
595    type Read = ();
596    type ReadValueBytes = u32;
597    type ContainsKey = u32;
598    type ContainsKeys = u32;
599    type ReadMultiValuesBytes = u32;
600    type FindKeysByPrefix = u32;
601    type FindKeyValuesByPrefix = u32;
602
603    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
604        let mut this = self.inner();
605        let chain_id = this.chain_id;
606        this.resource_controller.track_runtime_chain_id()?;
607        Ok(chain_id)
608    }
609
610    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
611        let mut this = self.inner();
612        let height = this.height;
613        this.resource_controller.track_runtime_block_height()?;
614        Ok(height)
615    }
616
617    fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
618        let mut this = self.inner();
619        let application_id = this.current_application().id;
620        this.resource_controller.track_runtime_application_id()?;
621        Ok(application_id)
622    }
623
624    fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
625        let mut this = self.inner();
626        let application_creator_chain_id = this.current_application().description.creator_chain_id;
627        this.resource_controller.track_runtime_application_id()?;
628        Ok(application_creator_chain_id)
629    }
630
631    fn read_application_description(
632        &mut self,
633        application_id: ApplicationId,
634    ) -> Result<ApplicationDescription, ExecutionError> {
635        let mut this = self.inner();
636        let description = this
637            .execution_state_sender
638            .send_request(|callback| ExecutionRequest::ReadApplicationDescription {
639                application_id,
640                callback,
641            })?
642            .recv_response()?;
643        this.resource_controller
644            .track_runtime_application_description(&description)?;
645        Ok(description)
646    }
647
648    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
649        let mut this = self.inner();
650        let parameters = this.current_application().description.parameters.clone();
651        this.resource_controller
652            .track_runtime_application_parameters(&parameters)?;
653        Ok(parameters)
654    }
655
656    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
657        let mut this = self.inner();
658        let timestamp = this
659            .execution_state_sender
660            .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
661            .recv_response()?;
662        this.resource_controller.track_runtime_timestamp()?;
663        Ok(timestamp)
664    }
665
666    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
667        let mut this = self.inner();
668        let balance = this
669            .execution_state_sender
670            .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
671            .recv_response()?;
672        this.resource_controller.track_runtime_balance()?;
673        Ok(balance)
674    }
675
676    fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
677        let mut this = self.inner();
678        let balance = this
679            .execution_state_sender
680            .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
681            .recv_response()?;
682        this.resource_controller.track_runtime_balance()?;
683        Ok(balance)
684    }
685
686    fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
687        let mut this = self.inner();
688        let owner_balances = this
689            .execution_state_sender
690            .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
691            .recv_response()?;
692        this.resource_controller
693            .track_runtime_owner_balances(&owner_balances)?;
694        Ok(owner_balances)
695    }
696
697    fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
698        let mut this = self.inner();
699        let owners = this
700            .execution_state_sender
701            .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
702            .recv_response()?;
703        this.resource_controller.track_runtime_owners(&owners)?;
704        Ok(owners)
705    }
706
707    fn read_allowance(
708        &mut self,
709        owner: AccountOwner,
710        spender: AccountOwner,
711    ) -> Result<Amount, ExecutionError> {
712        let this = self.inner();
713        let allowance = this
714            .execution_state_sender
715            .send_request(|callback| ExecutionRequest::Allowance {
716                owner,
717                spender,
718                callback,
719            })?
720            .recv_response()?;
721        Ok(allowance)
722    }
723
724    fn read_allowances(
725        &mut self,
726    ) -> Result<Vec<(AccountOwner, AccountOwner, Amount)>, ExecutionError> {
727        let this = self.inner();
728        let allowances = this
729            .execution_state_sender
730            .send_request(|callback| ExecutionRequest::Allowances { callback })?
731            .recv_response()?;
732        Ok(allowances)
733    }
734
735    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
736        let mut this = self.inner();
737        let chain_ownership = this
738            .execution_state_sender
739            .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
740            .recv_response()?;
741        this.resource_controller
742            .track_runtime_chain_ownership(&chain_ownership)?;
743        Ok(chain_ownership)
744    }
745
746    fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
747        let this = self.inner();
748        let application_permissions = this
749            .execution_state_sender
750            .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
751            .recv_response()?;
752        Ok(application_permissions)
753    }
754
755    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
756        let mut this = self.inner();
757        let id = this.current_application().id;
758        this.resource_controller.track_read_operation()?;
759        let receiver = this
760            .execution_state_sender
761            .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
762        let state = this.view_user_states.entry(id).or_default();
763        state.contains_key_queries.register(receiver)
764    }
765
766    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
767        let mut this = self.inner();
768        let id = this.current_application().id;
769        let state = this.view_user_states.entry(id).or_default();
770        let value = state.contains_key_queries.wait(*promise)?;
771        Ok(value)
772    }
773
774    fn contains_keys_new(
775        &mut self,
776        keys: Vec<Vec<u8>>,
777    ) -> Result<Self::ContainsKeys, ExecutionError> {
778        let mut this = self.inner();
779        let id = this.current_application().id;
780        this.resource_controller.track_read_operation()?;
781        let receiver = this
782            .execution_state_sender
783            .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
784        let state = this.view_user_states.entry(id).or_default();
785        state.contains_keys_queries.register(receiver)
786    }
787
788    fn contains_keys_wait(
789        &mut self,
790        promise: &Self::ContainsKeys,
791    ) -> Result<Vec<bool>, ExecutionError> {
792        let mut this = self.inner();
793        let id = this.current_application().id;
794        let state = this.view_user_states.entry(id).or_default();
795        let value = state.contains_keys_queries.wait(*promise)?;
796        Ok(value)
797    }
798
799    fn read_multi_values_bytes_new(
800        &mut self,
801        keys: Vec<Vec<u8>>,
802    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
803        let mut this = self.inner();
804        let id = this.current_application().id;
805        this.resource_controller.track_read_operation()?;
806        let receiver = this.execution_state_sender.send_request(move |callback| {
807            ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
808        })?;
809        let state = this.view_user_states.entry(id).or_default();
810        state.read_multi_values_queries.register(receiver)
811    }
812
813    fn read_multi_values_bytes_wait(
814        &mut self,
815        promise: &Self::ReadMultiValuesBytes,
816    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
817        let mut this = self.inner();
818        let id = this.current_application().id;
819        let state = this.view_user_states.entry(id).or_default();
820        let values = state.read_multi_values_queries.wait(*promise)?;
821        for value in &values {
822            if let Some(value) = &value {
823                this.resource_controller
824                    .track_bytes_read(value.len() as u64)?;
825            }
826        }
827        Ok(values)
828    }
829
830    fn read_value_bytes_new(
831        &mut self,
832        key: Vec<u8>,
833    ) -> Result<Self::ReadValueBytes, ExecutionError> {
834        let mut this = self.inner();
835        let id = this.current_application().id;
836        this.resource_controller.track_read_operation()?;
837        let receiver = this
838            .execution_state_sender
839            .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
840        let state = this.view_user_states.entry(id).or_default();
841        state.read_value_queries.register(receiver)
842    }
843
844    fn read_value_bytes_wait(
845        &mut self,
846        promise: &Self::ReadValueBytes,
847    ) -> Result<Option<Vec<u8>>, ExecutionError> {
848        let mut this = self.inner();
849        let id = this.current_application().id;
850        let value = {
851            let state = this.view_user_states.entry(id).or_default();
852            state.read_value_queries.wait(*promise)?
853        };
854        if let Some(value) = &value {
855            this.resource_controller
856                .track_bytes_read(value.len() as u64)?;
857        }
858        Ok(value)
859    }
860
861    fn find_keys_by_prefix_new(
862        &mut self,
863        key_prefix: Vec<u8>,
864    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
865        let mut this = self.inner();
866        let id = this.current_application().id;
867        this.resource_controller.track_read_operation()?;
868        let receiver = this.execution_state_sender.send_request(move |callback| {
869            ExecutionRequest::FindKeysByPrefix {
870                id,
871                key_prefix,
872                callback,
873            }
874        })?;
875        let state = this.view_user_states.entry(id).or_default();
876        state.find_keys_queries.register(receiver)
877    }
878
879    fn find_keys_by_prefix_wait(
880        &mut self,
881        promise: &Self::FindKeysByPrefix,
882    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
883        let mut this = self.inner();
884        let id = this.current_application().id;
885        let keys = {
886            let state = this.view_user_states.entry(id).or_default();
887            state.find_keys_queries.wait(*promise)?
888        };
889        let mut read_size = 0;
890        for key in &keys {
891            read_size += key.len();
892        }
893        this.resource_controller
894            .track_bytes_read(read_size as u64)?;
895        Ok(keys)
896    }
897
898    fn find_key_values_by_prefix_new(
899        &mut self,
900        key_prefix: Vec<u8>,
901    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
902        let mut this = self.inner();
903        let id = this.current_application().id;
904        this.resource_controller.track_read_operation()?;
905        let receiver = this.execution_state_sender.send_request(move |callback| {
906            ExecutionRequest::FindKeyValuesByPrefix {
907                id,
908                key_prefix,
909                callback,
910            }
911        })?;
912        let state = this.view_user_states.entry(id).or_default();
913        state.find_key_values_queries.register(receiver)
914    }
915
916    fn find_key_values_by_prefix_wait(
917        &mut self,
918        promise: &Self::FindKeyValuesByPrefix,
919    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
920        let mut this = self.inner();
921        let id = this.current_application().id;
922        let state = this.view_user_states.entry(id).or_default();
923        let key_values = state.find_key_values_queries.wait(*promise)?;
924        let mut read_size = 0;
925        for (key, value) in &key_values {
926            read_size += key.len() + value.len();
927        }
928        this.resource_controller
929            .track_bytes_read(read_size as u64)?;
930        Ok(key_values)
931    }
932
933    fn perform_http_request(
934        &mut self,
935        request: http::Request,
936    ) -> Result<http::Response, ExecutionError> {
937        let mut this = self.inner();
938        let app_permissions = this
939            .execution_state_sender
940            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
941            .recv_response()?;
942
943        let app_id = this.current_application().id;
944        ensure!(
945            app_permissions.can_make_http_requests(&app_id),
946            ExecutionError::UnauthorizedApplication(app_id)
947        );
948
949        this.resource_controller.track_http_request()?;
950
951        this.execution_state_sender
952            .send_request(|callback| ExecutionRequest::PerformHttpRequest {
953                request,
954                http_responses_are_oracle_responses:
955                    Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
956                callback,
957            })?
958            .recv_response()
959    }
960
961    fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
962        let this = self.inner();
963        this.execution_state_sender
964            .send_request(|callback| ExecutionRequest::AssertBefore {
965                timestamp,
966                callback,
967            })?
968            .recv_response()?
969    }
970
971    fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
972        let this = self.inner();
973        let blob_id = hash.into();
974        let content = this
975            .execution_state_sender
976            .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
977            .recv_response()?;
978        Ok(content.into_vec_or_clone())
979    }
980
981    fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
982        let this = self.inner();
983        let blob_id = hash.into();
984        this.execution_state_sender
985            .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
986            .recv_response()
987    }
988
989    fn has_empty_storage(&mut self, application: ApplicationId) -> Result<bool, ExecutionError> {
990        let this = self.inner();
991        this.execution_state_sender
992            .send_request(move |callback| ExecutionRequest::HasEmptyStorage {
993                application,
994                callback,
995            })?
996            .recv_response()
997    }
998
999    fn maximum_blob_size(&mut self) -> Result<u64, ExecutionError> {
1000        Ok(self.inner().resource_controller.policy().maximum_blob_size)
1001    }
1002
1003    fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
1004        Ok(self.inner().allow_application_logs)
1005    }
1006
1007    #[cfg(web)]
1008    fn send_log(&mut self, message: String, level: tracing::log::Level) {
1009        let this = self.inner();
1010        // Fire-and-forget: ignore errors since logging shouldn't affect execution.
1011        this.execution_state_sender
1012            .unbounded_send(ExecutionRequest::Log { message, level })
1013            .ok();
1014    }
1015}
1016
1017/// An extension trait to determine in compile time the different behaviors between contract and
1018/// services in the implementation of [`BaseRuntime`].
1019trait ContractOrServiceRuntime {
1020    /// Configured to `true` if the HTTP response size should be limited to the oracle response
1021    /// size.
1022    ///
1023    /// This is `false` for services, potentially allowing them to receive a larger HTTP response
1024    /// and only storing in the block a shorter oracle response.
1025    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
1026}
1027
1028impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
1029    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
1030}
1031
1032impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
1033    const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
1034}
1035
1036impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
1037    fn clone(&self) -> Self {
1038        SyncRuntimeHandle(self.0.clone())
1039    }
1040}
1041
1042impl ContractSyncRuntime {
1043    pub(crate) fn new(
1044        execution_state_sender: ExecutionStateSender,
1045        chain_id: ChainId,
1046        refund_grant_to: Option<Account>,
1047        resource_controller: ResourceController,
1048        action: &UserAction,
1049        allow_application_logs: bool,
1050    ) -> Self {
1051        SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1052            SyncRuntimeInternal::new(
1053                chain_id,
1054                action.height(),
1055                action.round(),
1056                if let UserAction::Message(context, _) = action {
1057                    Some(context.into())
1058                } else {
1059                    None
1060                },
1061                execution_state_sender,
1062                None,
1063                refund_grant_to,
1064                resource_controller,
1065                action.timestamp(),
1066                allow_application_logs,
1067            ),
1068        )))
1069    }
1070
1071    /// Preloads the code of a contract into the runtime's memory.
1072    pub(crate) fn preload_contract(
1073        &self,
1074        id: ApplicationId,
1075        code: UserContractCode,
1076        description: ApplicationDescription,
1077    ) {
1078        let this = self
1079            .0
1080            .as_ref()
1081            .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1082        let mut this_guard = this.inner();
1083
1084        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1085            entry.insert((code, description));
1086        }
1087    }
1088
1089    /// Main entry point to start executing a user action.
1090    pub(crate) fn run_action(
1091        mut self,
1092        application_id: ApplicationId,
1093        chain_id: ChainId,
1094        action: UserAction,
1095    ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1096        let result = self
1097            .deref_mut()
1098            .run_action(application_id, chain_id, action)?;
1099        let runtime = self
1100            .into_inner()
1101            .expect("Runtime clones should have been freed by now");
1102
1103        Ok((result, runtime.resource_controller))
1104    }
1105}
1106
1107impl ContractSyncRuntimeHandle {
1108    #[instrument(skip_all, fields(application_id = %application_id))]
1109    fn run_action(
1110        &self,
1111        application_id: ApplicationId,
1112        chain_id: ChainId,
1113        action: UserAction,
1114    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1115        let finalize_context = FinalizeContext {
1116            authenticated_owner: action.signer(),
1117            chain_id,
1118            height: action.height(),
1119            round: action.round(),
1120        };
1121
1122        {
1123            let runtime = self.inner();
1124            assert_eq!(runtime.chain_id, chain_id);
1125            assert_eq!(runtime.height, action.height());
1126        }
1127
1128        let signer = action.signer();
1129        let closure = move |code: &mut UserContractInstance| match action {
1130            UserAction::Instantiate(_context, argument) => {
1131                code.instantiate(argument).map(|()| None)
1132            }
1133            UserAction::Operation(_context, operation) => {
1134                code.execute_operation(operation).map(Option::Some)
1135            }
1136            UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1137            UserAction::ProcessStreams(_context, updates) => {
1138                code.process_streams(updates).map(|()| None)
1139            }
1140        };
1141
1142        let result = self.execute(application_id, signer, closure)?;
1143        self.finalize(finalize_context)?;
1144        Ok(result)
1145    }
1146
1147    /// Notifies all loaded applications that execution is finalizing.
1148    #[instrument(skip_all)]
1149    fn finalize(&self, context: FinalizeContext) -> Result<(), ExecutionError> {
1150        let applications = mem::take(&mut self.inner().applications_to_finalize)
1151            .into_iter()
1152            .rev();
1153
1154        self.inner().is_finalizing = true;
1155
1156        for application in applications {
1157            self.execute(application, context.authenticated_owner, |contract| {
1158                contract.finalize().map(|_| None)
1159            })?;
1160            self.inner().loaded_applications.remove(&application);
1161        }
1162
1163        Ok(())
1164    }
1165
1166    /// Executes a `closure` with the contract code for the `application_id`.
1167    #[instrument(skip_all, fields(application_id = %application_id))]
1168    fn execute(
1169        &self,
1170        application_id: ApplicationId,
1171        signer: Option<AccountOwner>,
1172        closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1173    ) -> Result<Option<Vec<u8>>, ExecutionError> {
1174        let contract = {
1175            let mut runtime = self.inner();
1176            let application = runtime.load_contract_instance(self.clone(), application_id)?;
1177
1178            let status = ApplicationStatus {
1179                caller_id: None,
1180                id: application_id,
1181                description: application.description.clone(),
1182                signer,
1183            };
1184
1185            runtime.push_application(status);
1186
1187            application
1188        };
1189
1190        let result = closure(
1191            &mut contract
1192                .instance
1193                .try_lock()
1194                .expect("Application should not be already executing"),
1195        )?;
1196
1197        let mut runtime = self.inner();
1198        let application_status = runtime.pop_application();
1199        assert_eq!(application_status.caller_id, None);
1200        assert_eq!(application_status.id, application_id);
1201        assert_eq!(application_status.description, contract.description);
1202        assert_eq!(application_status.signer, signer);
1203        assert!(runtime.call_stack.is_empty());
1204
1205        Ok(result)
1206    }
1207}
1208
1209impl ContractRuntime for ContractSyncRuntimeHandle {
1210    fn authenticated_owner(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1211        let this = self.inner();
1212        Ok(this.current_application().signer)
1213    }
1214
1215    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1216        Ok(self
1217            .inner()
1218            .executing_message
1219            .map(|metadata| metadata.is_bouncing))
1220    }
1221
1222    fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1223        Ok(self
1224            .inner()
1225            .executing_message
1226            .map(|metadata| metadata.origin))
1227    }
1228
1229    fn message_origin_timestamp(&mut self) -> Result<Option<Timestamp>, ExecutionError> {
1230        Ok(self
1231            .inner()
1232            .executing_message
1233            .map(|metadata| metadata.origin_timestamp))
1234    }
1235
1236    fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1237        let this = self.inner();
1238        if this.call_stack.len() <= 1 {
1239            return Ok(None);
1240        }
1241        Ok(this.current_application().caller_id)
1242    }
1243
1244    fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1245        Ok(match vm_runtime {
1246            VmRuntime::Wasm => {
1247                self.inner()
1248                    .resource_controller
1249                    .policy()
1250                    .maximum_wasm_fuel_per_block
1251            }
1252            VmRuntime::Evm => {
1253                self.inner()
1254                    .resource_controller
1255                    .policy()
1256                    .maximum_evm_fuel_per_block
1257            }
1258        })
1259    }
1260
1261    fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1262        Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1263    }
1264
1265    fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1266        let mut this = self.inner();
1267        this.resource_controller.track_fuel(fuel, vm_runtime)
1268    }
1269
1270    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1271        let mut this = self.inner();
1272        let application = this.current_application();
1273        let application_id = application.id;
1274        let authenticated_owner = application.signer;
1275        let mut refund_grant_to = this.refund_grant_to;
1276
1277        let grant = this
1278            .resource_controller
1279            .policy()
1280            .total_price(&message.grant)?;
1281        if grant.is_zero() {
1282            refund_grant_to = None;
1283        } else {
1284            this.resource_controller.track_grant(grant)?;
1285        }
1286        let kind = if message.is_tracked {
1287            MessageKind::Tracked
1288        } else {
1289            MessageKind::Simple
1290        };
1291
1292        this.execution_state_sender
1293            .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1294                message: OutgoingMessage {
1295                    destination: message.destination,
1296                    authenticated_owner,
1297                    refund_grant_to,
1298                    grant,
1299                    kind,
1300                    message: Message::User {
1301                        application_id,
1302                        bytes: message.message,
1303                    },
1304                },
1305                callback,
1306            })?
1307            .recv_response()?;
1308
1309        Ok(())
1310    }
1311
1312    fn transfer(
1313        &mut self,
1314        source: AccountOwner,
1315        destination: Account,
1316        amount: Amount,
1317    ) -> Result<(), ExecutionError> {
1318        let this = self.inner();
1319        let current_application = this.current_application();
1320        let application_id = current_application.id;
1321        let signer = current_application.signer;
1322
1323        this.execution_state_sender
1324            .send_request(|callback| ExecutionRequest::Transfer {
1325                source,
1326                destination,
1327                amount,
1328                signer,
1329                application_id,
1330                callback,
1331            })?
1332            .recv_response()?;
1333        Ok(())
1334    }
1335
1336    fn claim(
1337        &mut self,
1338        source: Account,
1339        destination: Account,
1340        amount: Amount,
1341    ) -> Result<(), ExecutionError> {
1342        let this = self.inner();
1343        let current_application = this.current_application();
1344        let application_id = current_application.id;
1345        let signer = current_application.signer;
1346
1347        this.execution_state_sender
1348            .send_request(|callback| ExecutionRequest::Claim {
1349                source,
1350                destination,
1351                amount,
1352                signer,
1353                application_id,
1354                callback,
1355            })?
1356            .recv_response()?;
1357        Ok(())
1358    }
1359
1360    fn approve(
1361        &mut self,
1362        owner: AccountOwner,
1363        spender: AccountOwner,
1364        amount: Amount,
1365    ) -> Result<(), ExecutionError> {
1366        let this = self.inner();
1367        let current_application = this.current_application();
1368        let application_id = current_application.id;
1369        let signer = current_application.signer;
1370
1371        this.execution_state_sender
1372            .send_request(|callback| ExecutionRequest::Approve {
1373                owner,
1374                spender,
1375                amount,
1376                signer,
1377                application_id,
1378                callback,
1379            })?
1380            .recv_response()?;
1381        Ok(())
1382    }
1383
1384    fn transfer_from(
1385        &mut self,
1386        owner: AccountOwner,
1387        spender: AccountOwner,
1388        destination: Account,
1389        amount: Amount,
1390    ) -> Result<(), ExecutionError> {
1391        let this = self.inner();
1392        let current_application = this.current_application();
1393        let application_id = current_application.id;
1394        let signer = current_application.signer;
1395
1396        this.execution_state_sender
1397            .send_request(|callback| ExecutionRequest::TransferFrom {
1398                owner,
1399                spender,
1400                destination,
1401                amount,
1402                signer,
1403                application_id,
1404                callback,
1405            })?
1406            .recv_response()?;
1407        Ok(())
1408    }
1409
1410    fn try_call_application(
1411        &mut self,
1412        authenticated: bool,
1413        callee_id: ApplicationId,
1414        argument: Vec<u8>,
1415    ) -> Result<Vec<u8>, ExecutionError> {
1416        let contract = self
1417            .inner()
1418            .prepare_for_call(self.clone(), authenticated, callee_id)?;
1419
1420        let value = contract
1421            .try_lock()
1422            .expect("Applications should not have reentrant calls")
1423            .execute_operation(argument)?;
1424
1425        self.inner().finish_call();
1426
1427        Ok(value)
1428    }
1429
1430    fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1431        let mut this = self.inner();
1432        ensure!(
1433            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1434            ExecutionError::StreamNameTooLong
1435        );
1436        let application_id = GenericApplicationId::User(this.current_application().id);
1437        let stream_id = StreamId {
1438            stream_name,
1439            application_id,
1440        };
1441        let value_len = value.len() as u64;
1442        let index = this
1443            .execution_state_sender
1444            .send_request(|callback| ExecutionRequest::Emit {
1445                stream_id,
1446                value,
1447                callback,
1448            })?
1449            .recv_response()?;
1450        // TODO(#365): Consider separate event fee categories.
1451        this.resource_controller.track_bytes_written(value_len)?;
1452        Ok(index)
1453    }
1454
1455    fn read_event(
1456        &mut self,
1457        chain_id: ChainId,
1458        stream_name: StreamName,
1459        index: u32,
1460    ) -> Result<Vec<u8>, ExecutionError> {
1461        let mut this = self.inner();
1462        ensure!(
1463            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1464            ExecutionError::StreamNameTooLong
1465        );
1466        let application_id = GenericApplicationId::User(this.current_application().id);
1467        let stream_id = StreamId {
1468            stream_name,
1469            application_id,
1470        };
1471        let event_id = EventId {
1472            stream_id,
1473            index,
1474            chain_id,
1475        };
1476        let event = this
1477            .execution_state_sender
1478            .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1479            .recv_response()?;
1480        // TODO(#365): Consider separate event fee categories.
1481        this.resource_controller
1482            .track_bytes_read(event.len() as u64)?;
1483        Ok(event)
1484    }
1485
1486    fn subscribe_to_events(
1487        &mut self,
1488        chain_id: ChainId,
1489        application_id: ApplicationId,
1490        stream_name: StreamName,
1491    ) -> Result<(), ExecutionError> {
1492        let this = self.inner();
1493        ensure!(
1494            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1495            ExecutionError::StreamNameTooLong
1496        );
1497        let stream_id = StreamId {
1498            stream_name,
1499            application_id: application_id.into(),
1500        };
1501        let subscriber_app_id = this.current_application().id;
1502        this.execution_state_sender
1503            .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1504                chain_id,
1505                stream_id,
1506                subscriber_app_id,
1507                callback,
1508            })?
1509            .recv_response()?;
1510        Ok(())
1511    }
1512
1513    fn unsubscribe_from_events(
1514        &mut self,
1515        chain_id: ChainId,
1516        application_id: ApplicationId,
1517        stream_name: StreamName,
1518    ) -> Result<(), ExecutionError> {
1519        let this = self.inner();
1520        ensure!(
1521            stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1522            ExecutionError::StreamNameTooLong
1523        );
1524        let stream_id = StreamId {
1525            stream_name,
1526            application_id: application_id.into(),
1527        };
1528        let subscriber_app_id = this.current_application().id;
1529        this.execution_state_sender
1530            .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1531                chain_id,
1532                stream_id,
1533                subscriber_app_id,
1534                callback,
1535            })?
1536            .recv_response()?;
1537        Ok(())
1538    }
1539
1540    fn query_service(
1541        &mut self,
1542        application_id: ApplicationId,
1543        query: Vec<u8>,
1544    ) -> Result<Vec<u8>, ExecutionError> {
1545        let mut this = self.inner();
1546
1547        let app_permissions = this
1548            .execution_state_sender
1549            .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1550            .recv_response()?;
1551
1552        let app_id = this.current_application().id;
1553        ensure!(
1554            app_permissions.can_call_services(&app_id),
1555            ExecutionError::UnauthorizedApplication(app_id)
1556        );
1557
1558        this.resource_controller.track_service_oracle_call()?;
1559
1560        this.run_service_oracle_query(application_id, query)
1561    }
1562
1563    fn open_chain(
1564        &mut self,
1565        ownership: ChainOwnership,
1566        application_permissions: ApplicationPermissions,
1567        balance: Amount,
1568    ) -> Result<ChainId, ExecutionError> {
1569        let parent_id = self.inner().chain_id;
1570        let block_height = self.block_height()?;
1571
1572        let timestamp = self.inner().user_context;
1573
1574        let chain_id = self
1575            .inner()
1576            .execution_state_sender
1577            .send_request(|callback| ExecutionRequest::OpenChain {
1578                ownership,
1579                balance,
1580                parent_id,
1581                block_height,
1582                timestamp,
1583                application_permissions,
1584                callback,
1585            })?
1586            .recv_response()?;
1587
1588        Ok(chain_id)
1589    }
1590
1591    fn close_chain(&mut self) -> Result<(), ExecutionError> {
1592        let this = self.inner();
1593        let application_id = this.current_application().id;
1594        this.execution_state_sender
1595            .send_request(|callback| ExecutionRequest::CloseChain {
1596                application_id,
1597                callback,
1598            })?
1599            .recv_response()?
1600    }
1601
1602    fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1603        let this = self.inner();
1604        let application_id = this.current_application().id;
1605        this.execution_state_sender
1606            .send_request(|callback| ExecutionRequest::ChangeOwnership {
1607                application_id,
1608                ownership,
1609                callback,
1610            })?
1611            .recv_response()?
1612    }
1613
1614    fn change_application_permissions(
1615        &mut self,
1616        application_permissions: ApplicationPermissions,
1617    ) -> Result<(), ExecutionError> {
1618        let this = self.inner();
1619        let application_id = this.current_application().id;
1620        this.execution_state_sender
1621            .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1622                application_id,
1623                application_permissions,
1624                callback,
1625            })?
1626            .recv_response()?
1627    }
1628
1629    fn peek_application_index(&mut self) -> Result<u32, ExecutionError> {
1630        let index = self
1631            .inner()
1632            .execution_state_sender
1633            .send_request(move |callback| ExecutionRequest::PeekApplicationIndex { callback })?
1634            .recv_response()?;
1635        Ok(index)
1636    }
1637
1638    fn create_application(
1639        &mut self,
1640        module_id: ModuleId,
1641        parameters: Vec<u8>,
1642        argument: Vec<u8>,
1643        required_application_ids: Vec<ApplicationId>,
1644    ) -> Result<ApplicationId, ExecutionError> {
1645        let chain_id = self.inner().chain_id;
1646        let block_height = self.block_height()?;
1647
1648        let CreateApplicationResult { app_id } = self
1649            .inner()
1650            .execution_state_sender
1651            .send_request(move |callback| ExecutionRequest::CreateApplication {
1652                chain_id,
1653                block_height,
1654                module_id,
1655                parameters,
1656                required_application_ids,
1657                callback,
1658            })?
1659            .recv_response()?;
1660
1661        let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1662
1663        contract
1664            .try_lock()
1665            .expect("Applications should not have reentrant calls")
1666            .instantiate(argument)?;
1667
1668        self.inner().finish_call();
1669
1670        Ok(app_id)
1671    }
1672
1673    fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1674        let blob = Blob::new_data(bytes);
1675        let blob_id = blob.id();
1676        let this = self.inner();
1677        this.execution_state_sender
1678            .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1679            .recv_response()?;
1680        Ok(DataBlobHash(blob_id.hash))
1681    }
1682
1683    fn publish_module(
1684        &mut self,
1685        contract: Bytecode,
1686        service: Bytecode,
1687        vm_runtime: VmRuntime,
1688        formats: Option<Vec<u8>>,
1689    ) -> Result<ModuleId, ExecutionError> {
1690        let (blobs, module_id) = crate::runtime::create_bytecode_blobs_sync(
1691            &contract,
1692            &service,
1693            vm_runtime,
1694            formats.as_deref(),
1695        );
1696        let this = self.inner();
1697        for blob in blobs {
1698            this.execution_state_sender
1699                .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1700                .recv_response()?;
1701        }
1702        Ok(module_id)
1703    }
1704
1705    fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1706        let this = self.inner();
1707        let round = this.round;
1708        this.execution_state_sender
1709            .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1710            .recv_response()
1711    }
1712
1713    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1714        let mut this = self.inner();
1715        let id = this.current_application().id;
1716        let state = this.view_user_states.entry(id).or_default();
1717        state.force_all_pending_queries()?;
1718        this.resource_controller.track_write_operations(
1719            batch
1720                .num_operations()
1721                .try_into()
1722                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1723        )?;
1724        this.resource_controller
1725            .track_bytes_written(batch.size() as u64)?;
1726        this.execution_state_sender
1727            .send_request(|callback| ExecutionRequest::WriteBatch {
1728                id,
1729                batch,
1730                callback,
1731            })?
1732            .recv_response()?;
1733        Ok(())
1734    }
1735}
1736
1737impl ServiceSyncRuntime {
1738    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1739    pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1740        Self::new_with_deadline(execution_state_sender, context, None)
1741    }
1742
1743    /// Creates a new [`ServiceSyncRuntime`] ready to execute using a provided [`QueryContext`].
1744    pub fn new_with_deadline(
1745        execution_state_sender: ExecutionStateSender,
1746        context: QueryContext,
1747        deadline: Option<Instant>,
1748    ) -> Self {
1749        // Query the allow_application_logs setting from the execution state.
1750        let allow_application_logs = execution_state_sender
1751            .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1752            .ok()
1753            .and_then(|receiver| receiver.recv_response().ok())
1754            .unwrap_or(false);
1755
1756        let runtime = SyncRuntime(Some(
1757            SyncRuntimeInternal::new(
1758                context.chain_id,
1759                context.next_block_height,
1760                None,
1761                None,
1762                execution_state_sender,
1763                deadline,
1764                None,
1765                ResourceController::default(),
1766                (),
1767                allow_application_logs,
1768            )
1769            .into(),
1770        ));
1771
1772        ServiceSyncRuntime {
1773            runtime,
1774            current_context: context,
1775        }
1776    }
1777
1778    /// Preloads the code of a service into the runtime's memory.
1779    pub(crate) fn preload_service(
1780        &self,
1781        id: ApplicationId,
1782        code: UserServiceCode,
1783        description: ApplicationDescription,
1784    ) {
1785        let this = self
1786            .runtime
1787            .0
1788            .as_ref()
1789            .expect("services shouldn't be preloaded while the runtime is being dropped");
1790        let mut this_guard = this.inner();
1791
1792        if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1793            entry.insert((code, description));
1794        }
1795    }
1796
1797    /// Runs the service runtime actor, waiting for `incoming_requests` to respond to.
1798    pub fn run(&mut self, incoming_requests: &std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1799        while let Ok(request) = incoming_requests.recv() {
1800            let ServiceRuntimeRequest::Query {
1801                application_id,
1802                context,
1803                query,
1804                callback,
1805            } = request;
1806
1807            let result = self
1808                .prepare_for_query(context)
1809                .and_then(|()| self.run_query(application_id, query));
1810
1811            if let Err(err) = callback.send(result) {
1812                tracing::debug!(%err, "Receiver for query result has been dropped");
1813            }
1814        }
1815    }
1816
1817    /// Prepares the runtime to query an application.
1818    pub(crate) fn prepare_for_query(
1819        &mut self,
1820        new_context: QueryContext,
1821    ) -> Result<(), ExecutionError> {
1822        let expected_context = QueryContext {
1823            local_time: new_context.local_time,
1824            ..self.current_context
1825        };
1826
1827        if new_context != expected_context {
1828            let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1829            *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1830        } else {
1831            self.handle_mut()
1832                .inner()
1833                .execution_state_sender
1834                .send_request(|callback| ExecutionRequest::SetLocalTime {
1835                    local_time: new_context.local_time,
1836                    callback,
1837                })?
1838                .recv_response()?;
1839        }
1840        Ok(())
1841    }
1842
1843    /// Queries an application specified by its [`ApplicationId`].
1844    pub(crate) fn run_query(
1845        &mut self,
1846        application_id: ApplicationId,
1847        query: Vec<u8>,
1848    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1849        let this = self.handle_mut();
1850        let response = this.try_query_application(application_id, query)?;
1851        let operations = mem::take(&mut this.inner().scheduled_operations);
1852
1853        Ok(QueryOutcome {
1854            response,
1855            operations,
1856        })
1857    }
1858
1859    /// Obtains the [`SyncRuntimeHandle`] stored in this [`ServiceSyncRuntime`].
1860    fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1861        self.runtime.0.as_mut().expect(
1862            "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1863        )
1864    }
1865}
1866
1867impl ServiceRuntime for ServiceSyncRuntimeHandle {
1868    /// Note that queries are not available from writable contexts.
1869    fn try_query_application(
1870        &mut self,
1871        queried_id: ApplicationId,
1872        argument: Vec<u8>,
1873    ) -> Result<Vec<u8>, ExecutionError> {
1874        let service = {
1875            let mut this = self.inner();
1876
1877            // Load the application.
1878            let application = this.load_service_instance(self.clone(), queried_id)?;
1879            // Make the call to user code.
1880            this.push_application(ApplicationStatus {
1881                caller_id: None,
1882                id: queried_id,
1883                description: application.description,
1884                signer: None,
1885            });
1886            application.instance
1887        };
1888        let response = service
1889            .try_lock()
1890            .expect("Applications should not have reentrant calls")
1891            .handle_query(argument)?;
1892        self.inner().pop_application();
1893        Ok(response)
1894    }
1895
1896    fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1897        let mut this = self.inner();
1898        let application_id = this.current_application().id;
1899
1900        this.scheduled_operations.push(Operation::User {
1901            application_id,
1902            bytes: operation,
1903        });
1904
1905        Ok(())
1906    }
1907
1908    fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1909        if let Some(deadline) = self.inner().deadline {
1910            if Instant::now() >= deadline {
1911                return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1912            }
1913        }
1914        Ok(())
1915    }
1916}
1917
1918/// A request to the service runtime actor.
1919pub enum ServiceRuntimeRequest {
1920    Query {
1921        application_id: ApplicationId,
1922        context: QueryContext,
1923        query: Vec<u8>,
1924        callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1925    },
1926}
1927
1928/// The origin of the execution.
1929#[derive(Clone, Copy, Debug)]
1930struct ExecutingMessage {
1931    is_bouncing: bool,
1932    origin: ChainId,
1933    origin_timestamp: Timestamp,
1934}
1935
1936impl From<&MessageContext> for ExecutingMessage {
1937    fn from(context: &MessageContext) -> Self {
1938        ExecutingMessage {
1939            is_bouncing: context.is_bouncing,
1940            origin: context.origin,
1941            origin_timestamp: context.origin_timestamp,
1942        }
1943    }
1944}
1945
1946/// Creates a compressed contract and service bytecode synchronously, plus an
1947/// optional `ApplicationFormats` blob built from the JSON-encoded `Formats`
1948/// description bytes.
1949pub fn create_bytecode_blobs_sync(
1950    contract: &Bytecode,
1951    service: &Bytecode,
1952    vm_runtime: VmRuntime,
1953    formats: Option<&[u8]>,
1954) -> (Vec<Blob>, ModuleId) {
1955    let formats_blob = formats.map(Blob::new_application_formats);
1956    let formats_blob_hash = formats_blob.as_ref().map(|blob| blob.id().hash);
1957    let (mut blobs, module_id) = match vm_runtime {
1958        VmRuntime::Wasm => {
1959            let compressed_contract = contract.compress();
1960            let compressed_service = service.compress();
1961            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1962            let service_blob = Blob::new_service_bytecode(compressed_service);
1963            let module_id = ModuleId::new_with_formats(
1964                contract_blob.id().hash,
1965                service_blob.id().hash,
1966                vm_runtime,
1967                formats_blob_hash,
1968            );
1969            (vec![contract_blob, service_blob], module_id)
1970        }
1971        VmRuntime::Evm => {
1972            let compressed_contract = contract.compress();
1973            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1974            let module_id = ModuleId::new_with_formats(
1975                evm_contract_blob.id().hash,
1976                evm_contract_blob.id().hash,
1977                vm_runtime,
1978                formats_blob_hash,
1979            );
1980            (vec![evm_contract_blob], module_id)
1981        }
1982    };
1983    if let Some(blob) = formats_blob {
1984        blobs.push(blob);
1985    }
1986    (blobs, module_id)
1987}