1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9 time::Instant,
10};
11
12use custom_debug_derive::Debug;
13use linera_base::{
14 crypto::CryptoHash,
15 data_types::{
16 Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
17 OracleResponse, SendMessageRequest, Timestamp,
18 },
19 ensure, http,
20 identifiers::{
21 Account, AccountOwner, BlobId, BlobType, ChainId, EventId, GenericApplicationId, MessageId,
22 StreamId, StreamName,
23 },
24 ownership::ChainOwnership,
25 vm::VmRuntime,
26};
27use linera_views::batch::Batch;
28use oneshot::Receiver;
29
30use crate::{
31 execution::UserAction,
32 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
33 resources::ResourceController,
34 system::CreateApplicationResult,
35 util::{ReceiverExt, UnboundedSenderExt},
36 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, ExecutionError,
37 FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation, OutgoingMessage,
38 QueryContext, QueryOutcome, ServiceRuntime, TransactionTracker, UserContractCode,
39 UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
40};
41
42#[cfg(test)]
43#[path = "unit_tests/runtime_tests.rs"]
44mod tests;
45
46pub trait WithContext {
47 type UserContext;
48}
49
50impl WithContext for UserContractInstance {
51 type UserContext = Timestamp;
52}
53
54impl WithContext for UserServiceInstance {
55 type UserContext = ();
56}
57
58#[cfg(test)]
59impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
60 type UserContext = ();
61}
62
63#[derive(Debug)]
64pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
65
66pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
67
68pub struct ServiceSyncRuntime {
69 runtime: SyncRuntime<UserServiceInstance>,
70 current_context: QueryContext,
71}
72
73#[derive(Debug)]
74pub struct SyncRuntimeHandle<UserInstance: WithContext>(
75 Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
76);
77
78pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
79pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
80
81#[derive(Debug)]
83pub struct SyncRuntimeInternal<UserInstance: WithContext> {
84 chain_id: ChainId,
86 height: BlockHeight,
89 round: Option<u32>,
91 #[debug(skip_if = Option::is_none)]
93 authenticated_signer: Option<AccountOwner>,
94 #[debug(skip_if = Option::is_none)]
96 executing_message: Option<ExecutingMessage>,
97
98 execution_state_sender: ExecutionStateSender,
100
101 is_finalizing: bool,
105 applications_to_finalize: Vec<ApplicationId>,
107
108 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
110 call_stack: Vec<ApplicationStatus>,
112 active_applications: HashSet<ApplicationId>,
114 transaction_tracker: TransactionTracker,
116 scheduled_operations: Vec<Operation>,
118
119 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
121
122 deadline: Option<Instant>,
126
127 #[debug(skip_if = Option::is_none)]
129 refund_grant_to: Option<Account>,
130 resource_controller: ResourceController,
132 user_context: UserInstance::UserContext,
134}
135
136#[derive(Debug)]
138struct ApplicationStatus {
139 caller_id: Option<ApplicationId>,
141 id: ApplicationId,
143 description: ApplicationDescription,
145 signer: Option<AccountOwner>,
147}
148
149#[derive(Debug)]
151struct LoadedApplication<Instance> {
152 instance: Arc<Mutex<Instance>>,
153 description: ApplicationDescription,
154}
155
156impl<Instance> LoadedApplication<Instance> {
157 fn new(instance: Instance, description: ApplicationDescription) -> Self {
159 LoadedApplication {
160 instance: Arc::new(Mutex::new(instance)),
161 description,
162 }
163 }
164}
165
166impl<Instance> Clone for LoadedApplication<Instance> {
167 fn clone(&self) -> Self {
170 LoadedApplication {
171 instance: self.instance.clone(),
172 description: self.description.clone(),
173 }
174 }
175}
176
177#[derive(Debug)]
178enum Promise<T> {
179 Ready(T),
180 Pending(Receiver<T>),
181}
182
183impl<T> Promise<T> {
184 fn force(&mut self) -> Result<(), ExecutionError> {
185 if let Promise::Pending(receiver) = self {
186 let value = receiver
187 .recv_ref()
188 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
189 *self = Promise::Ready(value);
190 }
191 Ok(())
192 }
193
194 fn read(self) -> Result<T, ExecutionError> {
195 match self {
196 Promise::Pending(receiver) => {
197 let value = receiver.recv_response()?;
198 Ok(value)
199 }
200 Promise::Ready(value) => Ok(value),
201 }
202 }
203}
204
205#[derive(Debug, Default)]
207struct QueryManager<T> {
208 pending_queries: BTreeMap<u32, Promise<T>>,
210 query_count: u32,
212 active_query_count: u32,
214}
215
216impl<T> QueryManager<T> {
217 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
218 let id = self.query_count;
219 self.pending_queries.insert(id, Promise::Pending(receiver));
220 self.query_count = self
221 .query_count
222 .checked_add(1)
223 .ok_or(ArithmeticError::Overflow)?;
224 self.active_query_count = self
225 .active_query_count
226 .checked_add(1)
227 .ok_or(ArithmeticError::Overflow)?;
228 Ok(id)
229 }
230
231 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
232 let promise = self
233 .pending_queries
234 .remove(&id)
235 .ok_or(ExecutionError::InvalidPromise)?;
236 let value = promise.read()?;
237 self.active_query_count -= 1;
238 Ok(value)
239 }
240
241 fn force_all(&mut self) -> Result<(), ExecutionError> {
242 for promise in self.pending_queries.values_mut() {
243 promise.force()?;
244 }
245 Ok(())
246 }
247}
248
249type Keys = Vec<Vec<u8>>;
250type Value = Vec<u8>;
251type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
252
253#[derive(Debug, Default)]
254struct ViewUserState {
255 contains_key_queries: QueryManager<bool>,
257 contains_keys_queries: QueryManager<Vec<bool>>,
259 read_value_queries: QueryManager<Option<Value>>,
261 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
263 find_keys_queries: QueryManager<Keys>,
265 find_key_values_queries: QueryManager<KeyValues>,
267}
268
269impl ViewUserState {
270 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
271 self.contains_key_queries.force_all()?;
272 self.contains_keys_queries.force_all()?;
273 self.read_value_queries.force_all()?;
274 self.read_multi_values_queries.force_all()?;
275 self.find_keys_queries.force_all()?;
276 self.find_key_values_queries.force_all()?;
277 Ok(())
278 }
279}
280
281impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
282 type Target = SyncRuntimeHandle<UserInstance>;
283
284 fn deref(&self) -> &Self::Target {
285 self.0.as_ref().expect(
286 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
287 )
288 }
289}
290
291impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
292 fn deref_mut(&mut self) -> &mut Self::Target {
293 self.0.as_mut().expect(
294 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
295 )
296 }
297}
298
299impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
300 fn drop(&mut self) {
301 if let Some(handle) = self.0.take() {
304 handle.inner().loaded_applications.clear();
305 }
306 }
307}
308
309impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
310 #[expect(clippy::too_many_arguments)]
311 fn new(
312 chain_id: ChainId,
313 height: BlockHeight,
314 round: Option<u32>,
315 authenticated_signer: Option<AccountOwner>,
316 executing_message: Option<ExecutingMessage>,
317 execution_state_sender: ExecutionStateSender,
318 deadline: Option<Instant>,
319 refund_grant_to: Option<Account>,
320 resource_controller: ResourceController,
321 transaction_tracker: TransactionTracker,
322 user_context: UserInstance::UserContext,
323 ) -> Self {
324 Self {
325 chain_id,
326 height,
327 round,
328 authenticated_signer,
329 executing_message,
330 execution_state_sender,
331 is_finalizing: false,
332 applications_to_finalize: Vec::new(),
333 loaded_applications: HashMap::new(),
334 call_stack: Vec::new(),
335 active_applications: HashSet::new(),
336 view_user_states: BTreeMap::new(),
337 deadline,
338 refund_grant_to,
339 resource_controller,
340 transaction_tracker,
341 scheduled_operations: Vec::new(),
342 user_context,
343 }
344 }
345
346 fn current_application(&self) -> &ApplicationStatus {
354 self.call_stack
355 .last()
356 .expect("Call stack is unexpectedly empty")
357 }
358
359 fn push_application(&mut self, status: ApplicationStatus) {
363 self.active_applications.insert(status.id);
364 self.call_stack.push(status);
365 }
366
367 fn pop_application(&mut self) -> ApplicationStatus {
375 let status = self
376 .call_stack
377 .pop()
378 .expect("Can't remove application from empty call stack");
379 assert!(self.active_applications.remove(&status.id));
380 status
381 }
382
383 fn check_for_reentrancy(
387 &mut self,
388 application_id: ApplicationId,
389 ) -> Result<(), ExecutionError> {
390 ensure!(
391 !self.active_applications.contains(&application_id),
392 ExecutionError::ReentrantCall(application_id)
393 );
394 Ok(())
395 }
396}
397
398impl SyncRuntimeInternal<UserContractInstance> {
399 fn load_contract_instance(
401 &mut self,
402 this: SyncRuntimeHandle<UserContractInstance>,
403 id: ApplicationId,
404 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
405 match self.loaded_applications.entry(id) {
406 #[cfg(web)]
408 hash_map::Entry::Vacant(_) => {
409 drop(this);
410 Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
411 id,
412 )))
413 }
414 #[cfg(not(web))]
415 hash_map::Entry::Vacant(entry) => {
416 let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
417 let (code, description, txn_tracker_moved) = self
418 .execution_state_sender
419 .send_request(move |callback| ExecutionRequest::LoadContract {
420 id,
421 callback,
422 txn_tracker: txn_tracker_moved,
423 })?
424 .recv_response()?;
425 self.transaction_tracker = txn_tracker_moved;
426
427 let instance = code.instantiate(this)?;
428
429 self.applications_to_finalize.push(id);
430 Ok(entry
431 .insert(LoadedApplication::new(instance, description))
432 .clone())
433 }
434 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
435 }
436 }
437
438 fn prepare_for_call(
440 &mut self,
441 this: ContractSyncRuntimeHandle,
442 authenticated: bool,
443 callee_id: ApplicationId,
444 ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
445 self.check_for_reentrancy(callee_id)?;
446
447 ensure!(
448 !self.is_finalizing,
449 ExecutionError::CrossApplicationCallInFinalize {
450 caller_id: Box::new(self.current_application().id),
451 callee_id: Box::new(callee_id),
452 }
453 );
454
455 let application = self.load_contract_instance(this, callee_id)?;
457
458 let caller = self.current_application();
459 let caller_id = caller.id;
460 let caller_signer = caller.signer;
461 let authenticated_signer = match caller_signer {
463 Some(signer) if authenticated => Some(signer),
464 _ => None,
465 };
466 let authenticated_caller_id = authenticated.then_some(caller_id);
467 self.push_application(ApplicationStatus {
468 caller_id: authenticated_caller_id,
469 id: callee_id,
470 description: application.description,
471 signer: authenticated_signer,
473 });
474 Ok(application.instance)
475 }
476
477 fn finish_call(&mut self) -> Result<(), ExecutionError> {
479 self.pop_application();
480 Ok(())
481 }
482
483 fn run_service_oracle_query(
485 &mut self,
486 application_id: ApplicationId,
487 query: Vec<u8>,
488 ) -> Result<Vec<u8>, ExecutionError> {
489 let context = QueryContext {
490 chain_id: self.chain_id,
491 next_block_height: self.height,
492 local_time: self.transaction_tracker.local_time(),
493 };
494 let sender = self.execution_state_sender.clone();
495
496 let txn_tracker = TransactionTracker::default()
497 .with_blobs(self.transaction_tracker.created_blobs().clone());
498
499 let timeout = self
500 .resource_controller
501 .remaining_service_oracle_execution_time()?;
502 let execution_start = Instant::now();
503 let deadline = Some(execution_start + timeout);
504
505 let mut service_runtime =
506 ServiceSyncRuntime::new_with_txn_tracker(sender, context, deadline, txn_tracker);
507
508 let result = service_runtime.run_query(application_id, query);
509
510 self.resource_controller
513 .track_service_oracle_execution(execution_start.elapsed())?;
514
515 let QueryOutcome {
516 response,
517 operations,
518 } = result?;
519
520 self.resource_controller
521 .track_service_oracle_response(response.len())?;
522
523 self.scheduled_operations.extend(operations);
524 Ok(response)
525 }
526}
527
528impl SyncRuntimeInternal<UserServiceInstance> {
529 fn load_service_instance(
531 &mut self,
532 this: ServiceSyncRuntimeHandle,
533 id: ApplicationId,
534 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
535 match self.loaded_applications.entry(id) {
536 #[cfg(web)]
538 hash_map::Entry::Vacant(_) => {
539 drop(this);
540 Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
541 id,
542 )))
543 }
544 #[cfg(not(web))]
545 hash_map::Entry::Vacant(entry) => {
546 let txn_tracker_moved = mem::take(&mut self.transaction_tracker);
547 let (code, description, txn_tracker_moved) = self
548 .execution_state_sender
549 .send_request(move |callback| ExecutionRequest::LoadService {
550 id,
551 callback,
552 txn_tracker: txn_tracker_moved,
553 })?
554 .recv_response()?;
555 self.transaction_tracker = txn_tracker_moved;
556
557 let instance = code.instantiate(this)?;
558 Ok(entry
559 .insert(LoadedApplication::new(instance, description))
560 .clone())
561 }
562 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
563 }
564 }
565}
566
567impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
568 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
569 let handle = self.0.take().expect(
570 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
571 );
572 let runtime = Arc::into_inner(handle.0)?
573 .into_inner()
574 .expect("`SyncRuntime` should run in a single thread which should not panic");
575 Some(runtime)
576 }
577}
578
579impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
580 for SyncRuntimeHandle<UserInstance>
581{
582 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
583 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
584 }
585}
586
587impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
588 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
589 self.0
590 .try_lock()
591 .expect("Synchronous runtimes run on a single execution thread")
592 }
593}
594
595impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
596where
597 Self: ContractOrServiceRuntime,
598{
599 type Read = ();
600 type ReadValueBytes = u32;
601 type ContainsKey = u32;
602 type ContainsKeys = u32;
603 type ReadMultiValuesBytes = u32;
604 type FindKeysByPrefix = u32;
605 type FindKeyValuesByPrefix = u32;
606
607 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
608 let mut this = self.inner();
609 let chain_id = this.chain_id;
610 this.resource_controller.track_runtime_chain_id()?;
611 Ok(chain_id)
612 }
613
614 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
615 let mut this = self.inner();
616 let height = this.height;
617 this.resource_controller.track_runtime_block_height()?;
618 Ok(height)
619 }
620
621 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
622 let mut this = self.inner();
623 let application_id = this.current_application().id;
624 this.resource_controller.track_runtime_application_id()?;
625 Ok(application_id)
626 }
627
628 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
629 let mut this = self.inner();
630 let application_creator_chain_id = this.current_application().description.creator_chain_id;
631 this.resource_controller.track_runtime_application_id()?;
632 Ok(application_creator_chain_id)
633 }
634
635 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
636 let mut this = self.inner();
637 let parameters = this.current_application().description.parameters.clone();
638 this.resource_controller
639 .track_runtime_application_parameters(¶meters)?;
640 Ok(parameters)
641 }
642
643 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
644 let mut this = self.inner();
645 let timestamp = this
646 .execution_state_sender
647 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
648 .recv_response()?;
649 this.resource_controller.track_runtime_timestamp()?;
650 Ok(timestamp)
651 }
652
653 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
654 let mut this = self.inner();
655 let balance = this
656 .execution_state_sender
657 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
658 .recv_response()?;
659 this.resource_controller.track_runtime_balance()?;
660 Ok(balance)
661 }
662
663 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
664 let mut this = self.inner();
665 let balance = this
666 .execution_state_sender
667 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
668 .recv_response()?;
669 this.resource_controller.track_runtime_balance()?;
670 Ok(balance)
671 }
672
673 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
674 let mut this = self.inner();
675 let owner_balances = this
676 .execution_state_sender
677 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
678 .recv_response()?;
679 this.resource_controller
680 .track_runtime_owner_balances(&owner_balances)?;
681 Ok(owner_balances)
682 }
683
684 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
685 let mut this = self.inner();
686 let owners = this
687 .execution_state_sender
688 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
689 .recv_response()?;
690 this.resource_controller.track_runtime_owners(&owners)?;
691 Ok(owners)
692 }
693
694 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
695 let mut this = self.inner();
696 let chain_ownership = this
697 .execution_state_sender
698 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
699 .recv_response()?;
700 this.resource_controller
701 .track_runtime_chain_ownership(&chain_ownership)?;
702 Ok(chain_ownership)
703 }
704
705 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
706 let mut this = self.inner();
707 let id = this.current_application().id;
708 this.resource_controller.track_read_operation()?;
709 let receiver = this
710 .execution_state_sender
711 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
712 let state = this.view_user_states.entry(id).or_default();
713 state.contains_key_queries.register(receiver)
714 }
715
716 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
717 let mut this = self.inner();
718 let id = this.current_application().id;
719 let state = this.view_user_states.entry(id).or_default();
720 let value = state.contains_key_queries.wait(*promise)?;
721 Ok(value)
722 }
723
724 fn contains_keys_new(
725 &mut self,
726 keys: Vec<Vec<u8>>,
727 ) -> Result<Self::ContainsKeys, ExecutionError> {
728 let mut this = self.inner();
729 let id = this.current_application().id;
730 this.resource_controller.track_read_operation()?;
731 let receiver = this
732 .execution_state_sender
733 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
734 let state = this.view_user_states.entry(id).or_default();
735 state.contains_keys_queries.register(receiver)
736 }
737
738 fn contains_keys_wait(
739 &mut self,
740 promise: &Self::ContainsKeys,
741 ) -> Result<Vec<bool>, ExecutionError> {
742 let mut this = self.inner();
743 let id = this.current_application().id;
744 let state = this.view_user_states.entry(id).or_default();
745 let value = state.contains_keys_queries.wait(*promise)?;
746 Ok(value)
747 }
748
749 fn read_multi_values_bytes_new(
750 &mut self,
751 keys: Vec<Vec<u8>>,
752 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
753 let mut this = self.inner();
754 let id = this.current_application().id;
755 this.resource_controller.track_read_operation()?;
756 let receiver = this.execution_state_sender.send_request(move |callback| {
757 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
758 })?;
759 let state = this.view_user_states.entry(id).or_default();
760 state.read_multi_values_queries.register(receiver)
761 }
762
763 fn read_multi_values_bytes_wait(
764 &mut self,
765 promise: &Self::ReadMultiValuesBytes,
766 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
767 let mut this = self.inner();
768 let id = this.current_application().id;
769 let state = this.view_user_states.entry(id).or_default();
770 let values = state.read_multi_values_queries.wait(*promise)?;
771 for value in &values {
772 if let Some(value) = &value {
773 this.resource_controller
774 .track_bytes_read(value.len() as u64)?;
775 }
776 }
777 Ok(values)
778 }
779
780 fn read_value_bytes_new(
781 &mut self,
782 key: Vec<u8>,
783 ) -> Result<Self::ReadValueBytes, ExecutionError> {
784 let mut this = self.inner();
785 let id = this.current_application().id;
786 this.resource_controller.track_read_operation()?;
787 let receiver = this
788 .execution_state_sender
789 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
790 let state = this.view_user_states.entry(id).or_default();
791 state.read_value_queries.register(receiver)
792 }
793
794 fn read_value_bytes_wait(
795 &mut self,
796 promise: &Self::ReadValueBytes,
797 ) -> Result<Option<Vec<u8>>, ExecutionError> {
798 let mut this = self.inner();
799 let id = this.current_application().id;
800 let value = {
801 let state = this.view_user_states.entry(id).or_default();
802 state.read_value_queries.wait(*promise)?
803 };
804 if let Some(value) = &value {
805 this.resource_controller
806 .track_bytes_read(value.len() as u64)?;
807 }
808 Ok(value)
809 }
810
811 fn find_keys_by_prefix_new(
812 &mut self,
813 key_prefix: Vec<u8>,
814 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
815 let mut this = self.inner();
816 let id = this.current_application().id;
817 this.resource_controller.track_read_operation()?;
818 let receiver = this.execution_state_sender.send_request(move |callback| {
819 ExecutionRequest::FindKeysByPrefix {
820 id,
821 key_prefix,
822 callback,
823 }
824 })?;
825 let state = this.view_user_states.entry(id).or_default();
826 state.find_keys_queries.register(receiver)
827 }
828
829 fn find_keys_by_prefix_wait(
830 &mut self,
831 promise: &Self::FindKeysByPrefix,
832 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
833 let mut this = self.inner();
834 let id = this.current_application().id;
835 let keys = {
836 let state = this.view_user_states.entry(id).or_default();
837 state.find_keys_queries.wait(*promise)?
838 };
839 let mut read_size = 0;
840 for key in &keys {
841 read_size += key.len();
842 }
843 this.resource_controller
844 .track_bytes_read(read_size as u64)?;
845 Ok(keys)
846 }
847
848 fn find_key_values_by_prefix_new(
849 &mut self,
850 key_prefix: Vec<u8>,
851 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
852 let mut this = self.inner();
853 let id = this.current_application().id;
854 this.resource_controller.track_read_operation()?;
855 let receiver = this.execution_state_sender.send_request(move |callback| {
856 ExecutionRequest::FindKeyValuesByPrefix {
857 id,
858 key_prefix,
859 callback,
860 }
861 })?;
862 let state = this.view_user_states.entry(id).or_default();
863 state.find_key_values_queries.register(receiver)
864 }
865
866 fn find_key_values_by_prefix_wait(
867 &mut self,
868 promise: &Self::FindKeyValuesByPrefix,
869 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
870 let mut this = self.inner();
871 let id = this.current_application().id;
872 let state = this.view_user_states.entry(id).or_default();
873 let key_values = state.find_key_values_queries.wait(*promise)?;
874 let mut read_size = 0;
875 for (key, value) in &key_values {
876 read_size += key.len() + value.len();
877 }
878 this.resource_controller
879 .track_bytes_read(read_size as u64)?;
880 Ok(key_values)
881 }
882
883 fn perform_http_request(
884 &mut self,
885 request: http::Request,
886 ) -> Result<http::Response, ExecutionError> {
887 let mut this = self.inner();
888 let app_permissions = this
889 .execution_state_sender
890 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
891 .recv_response()?;
892
893 let app_id = this.current_application().id;
894 ensure!(
895 app_permissions.can_make_http_requests(&app_id),
896 ExecutionError::UnauthorizedApplication(app_id)
897 );
898
899 this.resource_controller.track_http_request()?;
900
901 let response =
902 if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
903 match response {
904 OracleResponse::Http(response) => response,
905 _ => return Err(ExecutionError::OracleResponseMismatch),
906 }
907 } else {
908 this.execution_state_sender
909 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
910 request,
911 http_responses_are_oracle_responses:
912 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
913 callback,
914 })?
915 .recv_response()?
916 };
917 this.transaction_tracker
918 .add_oracle_response(OracleResponse::Http(response.clone()));
919 Ok(response)
920 }
921
922 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
923 let mut this = self.inner();
924 if !this
925 .transaction_tracker
926 .replay_oracle_response(OracleResponse::Assert)?
927 {
928 let local_time = this.transaction_tracker.local_time();
930 ensure!(
931 local_time < timestamp,
932 ExecutionError::AssertBefore {
933 timestamp,
934 local_time,
935 }
936 );
937 }
938 Ok(())
939 }
940
941 fn read_data_blob(&mut self, hash: &CryptoHash) -> Result<Vec<u8>, ExecutionError> {
942 let mut this = self.inner();
943 let blob_id = BlobId::new(*hash, BlobType::Data);
944 let (blob_content, is_new) = this
945 .execution_state_sender
946 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
947 .recv_response()?;
948 if is_new {
949 this.transaction_tracker
950 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
951 }
952 Ok(blob_content.into_bytes().into_vec())
953 }
954
955 fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError> {
956 let mut this = self.inner();
957 let blob_id = BlobId::new(*hash, BlobType::Data);
958 let is_new = this
959 .execution_state_sender
960 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
961 .recv_response()?;
962 if is_new {
963 this.transaction_tracker
964 .replay_oracle_response(OracleResponse::Blob(blob_id))?;
965 }
966 Ok(())
967 }
968}
969
970trait ContractOrServiceRuntime {
973 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
979}
980
981impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
982 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
983}
984
985impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
986 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
987}
988
989impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
990 fn clone(&self) -> Self {
991 SyncRuntimeHandle(self.0.clone())
992 }
993}
994
995impl ContractSyncRuntime {
996 pub(crate) fn new(
997 execution_state_sender: ExecutionStateSender,
998 chain_id: ChainId,
999 refund_grant_to: Option<Account>,
1000 resource_controller: ResourceController,
1001 action: &UserAction,
1002 txn_tracker: TransactionTracker,
1003 ) -> Self {
1004 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1005 SyncRuntimeInternal::new(
1006 chain_id,
1007 action.height(),
1008 action.round(),
1009 action.signer(),
1010 if let UserAction::Message(context, _) = action {
1011 Some(context.into())
1012 } else {
1013 None
1014 },
1015 execution_state_sender,
1016 None,
1017 refund_grant_to,
1018 resource_controller,
1019 txn_tracker,
1020 action.timestamp(),
1021 ),
1022 )))
1023 }
1024
1025 pub(crate) fn preload_contract(
1026 &self,
1027 id: ApplicationId,
1028 code: UserContractCode,
1029 description: ApplicationDescription,
1030 ) -> Result<(), ExecutionError> {
1031 let this = self
1032 .0
1033 .as_ref()
1034 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1035 let runtime_handle = this.clone();
1036 let mut this_guard = this.inner();
1037
1038 if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1039 entry.insert(LoadedApplication::new(
1040 code.instantiate(runtime_handle)?,
1041 description,
1042 ));
1043 this_guard.applications_to_finalize.push(id);
1044 }
1045
1046 Ok(())
1047 }
1048
1049 pub(crate) fn run_action(
1051 mut self,
1052 application_id: ApplicationId,
1053 chain_id: ChainId,
1054 action: UserAction,
1055 ) -> Result<(Option<Vec<u8>>, ResourceController, TransactionTracker), ExecutionError> {
1056 let result = self
1057 .deref_mut()
1058 .run_action(application_id, chain_id, action)?;
1059 let runtime = self
1060 .into_inner()
1061 .expect("Runtime clones should have been freed by now");
1062 Ok((
1063 result,
1064 runtime.resource_controller,
1065 runtime.transaction_tracker,
1066 ))
1067 }
1068}
1069
1070impl ContractSyncRuntimeHandle {
1071 fn run_action(
1072 &mut self,
1073 application_id: ApplicationId,
1074 chain_id: ChainId,
1075 action: UserAction,
1076 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1077 let finalize_context = FinalizeContext {
1078 authenticated_signer: action.signer(),
1079 chain_id,
1080 height: action.height(),
1081 round: action.round(),
1082 };
1083
1084 {
1085 let runtime = self.inner();
1086 assert_eq!(runtime.authenticated_signer, action.signer());
1087 assert_eq!(runtime.chain_id, chain_id);
1088 assert_eq!(runtime.height, action.height());
1089 }
1090
1091 let signer = action.signer();
1092 let closure = move |code: &mut UserContractInstance| match action {
1093 UserAction::Instantiate(_context, argument) => {
1094 code.instantiate(argument).map(|()| None)
1095 }
1096 UserAction::Operation(_context, operation) => {
1097 code.execute_operation(operation).map(Option::Some)
1098 }
1099 UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1100 UserAction::ProcessStreams(_context, updates) => {
1101 code.process_streams(updates).map(|()| None)
1102 }
1103 };
1104
1105 let result = self.execute(application_id, signer, closure)?;
1106 self.finalize(finalize_context)?;
1107 Ok(result)
1108 }
1109
1110 fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1112 let applications = mem::take(&mut self.inner().applications_to_finalize)
1113 .into_iter()
1114 .rev();
1115
1116 self.inner().is_finalizing = true;
1117
1118 for application in applications {
1119 self.execute(application, context.authenticated_signer, |contract| {
1120 contract.finalize().map(|_| None)
1121 })?;
1122 self.inner().loaded_applications.remove(&application);
1123 }
1124
1125 Ok(())
1126 }
1127
1128 fn execute(
1130 &mut self,
1131 application_id: ApplicationId,
1132 signer: Option<AccountOwner>,
1133 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1134 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1135 let contract = {
1136 let mut runtime = self.inner();
1137 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1138
1139 let status = ApplicationStatus {
1140 caller_id: None,
1141 id: application_id,
1142 description: application.description.clone(),
1143 signer,
1144 };
1145
1146 runtime.push_application(status);
1147
1148 application
1149 };
1150
1151 let result = closure(
1152 &mut contract
1153 .instance
1154 .try_lock()
1155 .expect("Application should not be already executing"),
1156 )?;
1157
1158 let mut runtime = self.inner();
1159 let application_status = runtime.pop_application();
1160 assert_eq!(application_status.caller_id, None);
1161 assert_eq!(application_status.id, application_id);
1162 assert_eq!(application_status.description, contract.description);
1163 assert_eq!(application_status.signer, signer);
1164 assert!(runtime.call_stack.is_empty());
1165
1166 Ok(result)
1167 }
1168}
1169
1170impl ContractRuntime for ContractSyncRuntimeHandle {
1171 fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1172 Ok(self.inner().authenticated_signer)
1173 }
1174
1175 fn message_id(&mut self) -> Result<Option<MessageId>, ExecutionError> {
1176 Ok(self.inner().executing_message.map(|metadata| metadata.id))
1177 }
1178
1179 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1180 Ok(self
1181 .inner()
1182 .executing_message
1183 .map(|metadata| metadata.is_bouncing))
1184 }
1185
1186 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1187 let this = self.inner();
1188 if this.call_stack.len() <= 1 {
1189 return Ok(None);
1190 }
1191 Ok(this.current_application().caller_id)
1192 }
1193
1194 fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1195 Ok(match vm_runtime {
1196 VmRuntime::Wasm => {
1197 self.inner()
1198 .resource_controller
1199 .policy()
1200 .maximum_wasm_fuel_per_block
1201 }
1202 VmRuntime::Evm => {
1203 self.inner()
1204 .resource_controller
1205 .policy()
1206 .maximum_evm_fuel_per_block
1207 }
1208 })
1209 }
1210
1211 fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1212 Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1213 }
1214
1215 fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1216 let mut this = self.inner();
1217 this.resource_controller.track_fuel(fuel, vm_runtime)
1218 }
1219
1220 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1221 let mut this = self.inner();
1222 let application = this.current_application();
1223 let application_id = application.id;
1224 let authenticated_signer = application.signer;
1225 let mut refund_grant_to = this.refund_grant_to;
1226
1227 let grant = this
1228 .resource_controller
1229 .policy()
1230 .total_price(&message.grant)?;
1231 if grant.is_zero() {
1232 refund_grant_to = None;
1233 } else {
1234 this.resource_controller.track_grant(grant)?;
1235 }
1236 let kind = if message.is_tracked {
1237 MessageKind::Tracked
1238 } else {
1239 MessageKind::Simple
1240 };
1241
1242 this.transaction_tracker
1243 .add_outgoing_message(OutgoingMessage {
1244 destination: message.destination,
1245 authenticated_signer,
1246 refund_grant_to,
1247 grant,
1248 kind,
1249 message: Message::User {
1250 application_id,
1251 bytes: message.message,
1252 },
1253 })?;
1254
1255 Ok(())
1256 }
1257
1258 fn transfer(
1259 &mut self,
1260 source: AccountOwner,
1261 destination: Account,
1262 amount: Amount,
1263 ) -> Result<(), ExecutionError> {
1264 let mut this = self.inner();
1265 let current_application = this.current_application();
1266 let application_id = current_application.id;
1267 let signer = current_application.signer;
1268
1269 let maybe_message = this
1270 .execution_state_sender
1271 .send_request(|callback| ExecutionRequest::Transfer {
1272 source,
1273 destination,
1274 amount,
1275 signer,
1276 application_id,
1277 callback,
1278 })?
1279 .recv_response()?;
1280
1281 this.transaction_tracker
1282 .add_outgoing_messages(maybe_message)?;
1283 Ok(())
1284 }
1285
1286 fn claim(
1287 &mut self,
1288 source: Account,
1289 destination: Account,
1290 amount: Amount,
1291 ) -> Result<(), ExecutionError> {
1292 let mut this = self.inner();
1293 let current_application = this.current_application();
1294 let application_id = current_application.id;
1295 let signer = current_application.signer;
1296
1297 let message = this
1298 .execution_state_sender
1299 .send_request(|callback| ExecutionRequest::Claim {
1300 source,
1301 destination,
1302 amount,
1303 signer,
1304 application_id,
1305 callback,
1306 })?
1307 .recv_response()?;
1308 this.transaction_tracker.add_outgoing_message(message)?;
1309 Ok(())
1310 }
1311
1312 fn try_call_application(
1313 &mut self,
1314 authenticated: bool,
1315 callee_id: ApplicationId,
1316 argument: Vec<u8>,
1317 ) -> Result<Vec<u8>, ExecutionError> {
1318 let contract = self
1319 .inner()
1320 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1321
1322 let value = contract
1323 .try_lock()
1324 .expect("Applications should not have reentrant calls")
1325 .execute_operation(argument)?;
1326
1327 self.inner().finish_call()?;
1328
1329 Ok(value)
1330 }
1331
1332 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1333 let mut this = self.inner();
1334 ensure!(
1335 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1336 ExecutionError::StreamNameTooLong
1337 );
1338 let application_id = GenericApplicationId::User(this.current_application().id);
1339 let stream_id = StreamId {
1340 stream_name,
1341 application_id,
1342 };
1343 let index = this
1344 .execution_state_sender
1345 .send_request(|callback| ExecutionRequest::NextEventIndex {
1346 stream_id: stream_id.clone(),
1347 callback,
1348 })?
1349 .recv_response()?;
1350 this.resource_controller
1352 .track_bytes_written(value.len() as u64)?;
1353 this.transaction_tracker.add_event(stream_id, index, value);
1354 Ok(index)
1355 }
1356
1357 fn read_event(
1358 &mut self,
1359 chain_id: ChainId,
1360 stream_name: StreamName,
1361 index: u32,
1362 ) -> Result<Vec<u8>, ExecutionError> {
1363 let mut this = self.inner();
1364 ensure!(
1365 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1366 ExecutionError::StreamNameTooLong
1367 );
1368 let application_id = GenericApplicationId::User(this.current_application().id);
1369 let stream_id = StreamId {
1370 stream_name,
1371 application_id,
1372 };
1373 let event_id = EventId {
1374 stream_id,
1375 index,
1376 chain_id,
1377 };
1378 let event = this
1379 .execution_state_sender
1380 .send_request(|callback| ExecutionRequest::ReadEvent {
1381 event_id: event_id.clone(),
1382 callback,
1383 })?
1384 .recv_response()?;
1385 this.resource_controller
1387 .track_bytes_read(event.len() as u64)?;
1388 this.transaction_tracker
1389 .replay_oracle_response(OracleResponse::Event(event_id, event.clone()))?;
1390 Ok(event)
1391 }
1392
1393 fn subscribe_to_events(
1394 &mut self,
1395 chain_id: ChainId,
1396 application_id: ApplicationId,
1397 stream_name: StreamName,
1398 ) -> Result<(), ExecutionError> {
1399 let mut this = self.inner();
1400 ensure!(
1401 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1402 ExecutionError::StreamNameTooLong
1403 );
1404 let stream_id = StreamId {
1405 stream_name,
1406 application_id: application_id.into(),
1407 };
1408 let subscriber_app_id = this.current_application().id;
1409 let next_index = this
1410 .execution_state_sender
1411 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1412 chain_id,
1413 stream_id: stream_id.clone(),
1414 subscriber_app_id,
1415 callback,
1416 })?
1417 .recv_response()?;
1418 this.transaction_tracker.add_stream_to_process(
1419 subscriber_app_id,
1420 chain_id,
1421 stream_id,
1422 0,
1423 next_index,
1424 );
1425 Ok(())
1426 }
1427
1428 fn unsubscribe_from_events(
1429 &mut self,
1430 chain_id: ChainId,
1431 application_id: ApplicationId,
1432 stream_name: StreamName,
1433 ) -> Result<(), ExecutionError> {
1434 let mut this = self.inner();
1435 ensure!(
1436 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1437 ExecutionError::StreamNameTooLong
1438 );
1439 let stream_id = StreamId {
1440 stream_name,
1441 application_id: application_id.into(),
1442 };
1443 let subscriber_app_id = this.current_application().id;
1444 this.execution_state_sender
1445 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1446 chain_id,
1447 stream_id: stream_id.clone(),
1448 subscriber_app_id,
1449 callback,
1450 })?
1451 .recv_response()?;
1452 this.transaction_tracker
1453 .remove_stream_to_process(application_id, chain_id, stream_id);
1454 Ok(())
1455 }
1456
1457 fn query_service(
1458 &mut self,
1459 application_id: ApplicationId,
1460 query: Vec<u8>,
1461 ) -> Result<Vec<u8>, ExecutionError> {
1462 let mut this = self.inner();
1463
1464 let app_permissions = this
1465 .execution_state_sender
1466 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1467 .recv_response()?;
1468
1469 let app_id = this.current_application().id;
1470 ensure!(
1471 app_permissions.can_call_services(&app_id),
1472 ExecutionError::UnauthorizedApplication(app_id)
1473 );
1474
1475 this.resource_controller.track_service_oracle_call()?;
1476 let response =
1477 if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1478 match response {
1479 OracleResponse::Service(bytes) => bytes,
1480 _ => return Err(ExecutionError::OracleResponseMismatch),
1481 }
1482 } else {
1483 this.run_service_oracle_query(application_id, query)?
1484 };
1485
1486 this.transaction_tracker
1487 .add_oracle_response(OracleResponse::Service(response.clone()));
1488
1489 Ok(response)
1490 }
1491
1492 fn open_chain(
1493 &mut self,
1494 ownership: ChainOwnership,
1495 application_permissions: ApplicationPermissions,
1496 balance: Amount,
1497 ) -> Result<ChainId, ExecutionError> {
1498 let parent_id = self.inner().chain_id;
1499 let block_height = self.block_height()?;
1500
1501 let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1502 let timestamp = self.inner().user_context;
1503
1504 let (chain_id, txn_tracker_moved) = self
1505 .inner()
1506 .execution_state_sender
1507 .send_request(|callback| ExecutionRequest::OpenChain {
1508 ownership,
1509 balance,
1510 parent_id,
1511 block_height,
1512 timestamp,
1513 application_permissions,
1514 callback,
1515 txn_tracker: txn_tracker_moved,
1516 })?
1517 .recv_response()?;
1518
1519 self.inner().transaction_tracker = txn_tracker_moved;
1520
1521 Ok(chain_id)
1522 }
1523
1524 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1525 let this = self.inner();
1526 let application_id = this.current_application().id;
1527 this.execution_state_sender
1528 .send_request(|callback| ExecutionRequest::CloseChain {
1529 application_id,
1530 callback,
1531 })?
1532 .recv_response()?
1533 }
1534
1535 fn change_application_permissions(
1536 &mut self,
1537 application_permissions: ApplicationPermissions,
1538 ) -> Result<(), ExecutionError> {
1539 let this = self.inner();
1540 let application_id = this.current_application().id;
1541 this.execution_state_sender
1542 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1543 application_id,
1544 application_permissions,
1545 callback,
1546 })?
1547 .recv_response()?
1548 }
1549
1550 fn create_application(
1551 &mut self,
1552 module_id: ModuleId,
1553 parameters: Vec<u8>,
1554 argument: Vec<u8>,
1555 required_application_ids: Vec<ApplicationId>,
1556 ) -> Result<ApplicationId, ExecutionError> {
1557 let chain_id = self.inner().chain_id;
1558 let block_height = self.block_height()?;
1559
1560 let txn_tracker_moved = mem::take(&mut self.inner().transaction_tracker);
1561
1562 let CreateApplicationResult {
1563 app_id,
1564 txn_tracker: txn_tracker_moved,
1565 } = self
1566 .inner()
1567 .execution_state_sender
1568 .send_request(move |callback| ExecutionRequest::CreateApplication {
1569 chain_id,
1570 block_height,
1571 module_id,
1572 parameters,
1573 required_application_ids,
1574 callback,
1575 txn_tracker: txn_tracker_moved,
1576 })?
1577 .recv_response()??;
1578
1579 self.inner().transaction_tracker = txn_tracker_moved;
1580
1581 let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1582
1583 contract
1584 .try_lock()
1585 .expect("Applications should not have reentrant calls")
1586 .instantiate(argument)?;
1587
1588 self.inner().finish_call()?;
1589
1590 Ok(app_id)
1591 }
1592
1593 fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<BlobId, ExecutionError> {
1594 let blob = Blob::new_data(bytes);
1595 let blob_id = blob.id();
1596 self.inner().transaction_tracker.add_created_blob(blob);
1597 Ok(blob_id)
1598 }
1599
1600 fn publish_module(
1601 &mut self,
1602 contract: Bytecode,
1603 service: Bytecode,
1604 vm_runtime: VmRuntime,
1605 ) -> Result<ModuleId, ExecutionError> {
1606 let (blobs, module_id) =
1607 crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1608 for blob in blobs {
1609 self.inner().transaction_tracker.add_created_blob(blob);
1610 }
1611 Ok(module_id)
1612 }
1613
1614 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1615 let mut this = self.inner();
1616 let round =
1617 if let Some(response) = this.transaction_tracker.next_replayed_oracle_response()? {
1618 match response {
1619 OracleResponse::Round(round) => round,
1620 _ => return Err(ExecutionError::OracleResponseMismatch),
1621 }
1622 } else {
1623 this.round
1624 };
1625 this.transaction_tracker
1626 .add_oracle_response(OracleResponse::Round(round));
1627 Ok(round)
1628 }
1629
1630 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1631 let mut this = self.inner();
1632 let id = this.current_application().id;
1633 let state = this.view_user_states.entry(id).or_default();
1634 state.force_all_pending_queries()?;
1635 this.resource_controller.track_write_operations(
1636 batch
1637 .num_operations()
1638 .try_into()
1639 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1640 )?;
1641 this.resource_controller
1642 .track_bytes_written(batch.size() as u64)?;
1643 this.execution_state_sender
1644 .send_request(|callback| ExecutionRequest::WriteBatch {
1645 id,
1646 batch,
1647 callback,
1648 })?
1649 .recv_response()?;
1650 Ok(())
1651 }
1652}
1653
1654impl ServiceSyncRuntime {
1655 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1657 let mut txn_tracker = TransactionTracker::default();
1658 txn_tracker.set_local_time(context.local_time);
1659 Self::new_with_txn_tracker(execution_state_sender, context, None, txn_tracker)
1660 }
1661
1662 pub fn new_with_txn_tracker(
1664 execution_state_sender: ExecutionStateSender,
1665 context: QueryContext,
1666 deadline: Option<Instant>,
1667 txn_tracker: TransactionTracker,
1668 ) -> Self {
1669 let runtime = SyncRuntime(Some(
1670 SyncRuntimeInternal::new(
1671 context.chain_id,
1672 context.next_block_height,
1673 None,
1674 None,
1675 None,
1676 execution_state_sender,
1677 deadline,
1678 None,
1679 ResourceController::default(),
1680 txn_tracker,
1681 (),
1682 )
1683 .into(),
1684 ));
1685
1686 ServiceSyncRuntime {
1687 runtime,
1688 current_context: context,
1689 }
1690 }
1691
1692 pub(crate) fn preload_service(
1694 &self,
1695 id: ApplicationId,
1696 code: UserServiceCode,
1697 description: ApplicationDescription,
1698 ) -> Result<(), ExecutionError> {
1699 let this = self
1700 .runtime
1701 .0
1702 .as_ref()
1703 .expect("services shouldn't be preloaded while the runtime is being dropped");
1704 let runtime_handle = this.clone();
1705 let mut this_guard = this.inner();
1706
1707 if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1708 entry.insert(LoadedApplication::new(
1709 code.instantiate(runtime_handle)?,
1710 description,
1711 ));
1712 this_guard.applications_to_finalize.push(id);
1713 }
1714
1715 Ok(())
1716 }
1717
1718 pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1720 while let Ok(request) = incoming_requests.recv() {
1721 let ServiceRuntimeRequest::Query {
1722 application_id,
1723 context,
1724 query,
1725 callback,
1726 } = request;
1727
1728 self.prepare_for_query(context);
1729
1730 let _ = callback.send(self.run_query(application_id, query));
1731 }
1732 }
1733
1734 pub(crate) fn prepare_for_query(&mut self, new_context: QueryContext) {
1736 let expected_context = QueryContext {
1737 local_time: new_context.local_time,
1738 ..self.current_context
1739 };
1740
1741 if new_context != expected_context {
1742 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1743 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1744 } else {
1745 self.handle_mut()
1746 .inner()
1747 .transaction_tracker
1748 .set_local_time(new_context.local_time);
1749 }
1750 }
1751
1752 pub(crate) fn run_query(
1754 &mut self,
1755 application_id: ApplicationId,
1756 query: Vec<u8>,
1757 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1758 let this = self.handle_mut();
1759 let response = this.try_query_application(application_id, query)?;
1760 let operations = mem::take(&mut this.inner().scheduled_operations);
1761
1762 Ok(QueryOutcome {
1763 response,
1764 operations,
1765 })
1766 }
1767
1768 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1770 self.runtime.0.as_mut().expect(
1771 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1772 )
1773 }
1774}
1775
1776impl ServiceRuntime for ServiceSyncRuntimeHandle {
1777 fn try_query_application(
1779 &mut self,
1780 queried_id: ApplicationId,
1781 argument: Vec<u8>,
1782 ) -> Result<Vec<u8>, ExecutionError> {
1783 let service = {
1784 let mut this = self.inner();
1785
1786 let application = this.load_service_instance(self.clone(), queried_id)?;
1788 this.push_application(ApplicationStatus {
1790 caller_id: None,
1791 id: queried_id,
1792 description: application.description,
1793 signer: None,
1794 });
1795 application.instance
1796 };
1797 let response = service
1798 .try_lock()
1799 .expect("Applications should not have reentrant calls")
1800 .handle_query(argument)?;
1801 self.inner().pop_application();
1802 Ok(response)
1803 }
1804
1805 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1806 let mut this = self.inner();
1807 let application_id = this.current_application().id;
1808
1809 this.scheduled_operations.push(Operation::User {
1810 application_id,
1811 bytes: operation,
1812 });
1813
1814 Ok(())
1815 }
1816
1817 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1818 if let Some(deadline) = self.inner().deadline {
1819 if Instant::now() >= deadline {
1820 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1821 }
1822 }
1823 Ok(())
1824 }
1825}
1826
1827pub enum ServiceRuntimeRequest {
1829 Query {
1830 application_id: ApplicationId,
1831 context: QueryContext,
1832 query: Vec<u8>,
1833 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1834 },
1835}
1836
1837#[derive(Clone, Copy, Debug)]
1839struct ExecutingMessage {
1840 id: MessageId,
1841 is_bouncing: bool,
1842}
1843
1844impl From<&MessageContext> for ExecutingMessage {
1845 fn from(context: &MessageContext) -> Self {
1846 ExecutingMessage {
1847 id: context.message_id,
1848 is_bouncing: context.is_bouncing,
1849 }
1850 }
1851}
1852
1853pub fn create_bytecode_blobs_sync(
1855 contract: Bytecode,
1856 service: Bytecode,
1857 vm_runtime: VmRuntime,
1858) -> (Vec<Blob>, ModuleId) {
1859 match vm_runtime {
1860 VmRuntime::Wasm => {
1861 let compressed_contract = contract.compress();
1862 let compressed_service = service.compress();
1863 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1864 let service_blob = Blob::new_service_bytecode(compressed_service);
1865 let module_id =
1866 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1867 (vec![contract_blob, service_blob], module_id)
1868 }
1869 VmRuntime::Evm => {
1870 let compressed_contract = contract.compress();
1871 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1872 let module_id = ModuleId::new(
1873 evm_contract_blob.id().hash,
1874 evm_contract_blob.id().hash,
1875 vm_runtime,
1876 );
1877 (vec![evm_contract_blob], module_id)
1878 }
1879 }
1880}