1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15 SendMessageRequest, Timestamp,
16 },
17 ensure, http,
18 identifiers::{
19 Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20 },
21 ownership::ChainOwnership,
22 time::Instant,
23 vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29 execution::UserAction,
30 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31 resources::ResourceController,
32 system::CreateApplicationResult,
33 util::{ReceiverExt, UnboundedSenderExt},
34 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35 ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36 OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37 UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45 type UserContext;
46}
47
48impl WithContext for UserContractInstance {
49 type UserContext = Timestamp;
50}
51
52impl WithContext for UserServiceInstance {
53 type UserContext = ();
54}
55
56#[cfg(test)]
57impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
58 type UserContext = ();
59}
60
61#[derive(Debug)]
62pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
63
64pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
65
66pub struct ServiceSyncRuntime {
67 runtime: SyncRuntime<UserServiceInstance>,
68 current_context: QueryContext,
69}
70
71#[derive(Debug)]
72pub struct SyncRuntimeHandle<UserInstance: WithContext>(
73 Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
74);
75
76pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
77pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
78
79#[derive(Debug)]
81pub struct SyncRuntimeInternal<UserInstance: WithContext> {
82 chain_id: ChainId,
84 height: BlockHeight,
87 round: Option<u32>,
89 #[debug(skip_if = Option::is_none)]
91 executing_message: Option<ExecutingMessage>,
92
93 execution_state_sender: ExecutionStateSender,
95
96 is_finalizing: bool,
100 applications_to_finalize: Vec<ApplicationId>,
102
103 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
105 call_stack: Vec<ApplicationStatus>,
107 active_applications: HashSet<ApplicationId>,
109 scheduled_operations: Vec<Operation>,
111
112 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
114
115 deadline: Option<Instant>,
119
120 #[debug(skip_if = Option::is_none)]
122 refund_grant_to: Option<Account>,
123 resource_controller: ResourceController,
125 user_context: UserInstance::UserContext,
127}
128
129#[derive(Debug)]
131struct ApplicationStatus {
132 caller_id: Option<ApplicationId>,
134 id: ApplicationId,
136 description: ApplicationDescription,
138 signer: Option<AccountOwner>,
140}
141
142#[derive(Debug)]
144struct LoadedApplication<Instance> {
145 instance: Arc<Mutex<Instance>>,
146 description: ApplicationDescription,
147}
148
149impl<Instance> LoadedApplication<Instance> {
150 fn new(instance: Instance, description: ApplicationDescription) -> Self {
152 LoadedApplication {
153 instance: Arc::new(Mutex::new(instance)),
154 description,
155 }
156 }
157}
158
159impl<Instance> Clone for LoadedApplication<Instance> {
160 fn clone(&self) -> Self {
163 LoadedApplication {
164 instance: self.instance.clone(),
165 description: self.description.clone(),
166 }
167 }
168}
169
170#[derive(Debug)]
171enum Promise<T> {
172 Ready(T),
173 Pending(Receiver<T>),
174}
175
176impl<T> Promise<T> {
177 fn force(&mut self) -> Result<(), ExecutionError> {
178 if let Promise::Pending(receiver) = self {
179 let value = receiver
180 .recv_ref()
181 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
182 *self = Promise::Ready(value);
183 }
184 Ok(())
185 }
186
187 fn read(self) -> Result<T, ExecutionError> {
188 match self {
189 Promise::Pending(receiver) => {
190 let value = receiver.recv_response()?;
191 Ok(value)
192 }
193 Promise::Ready(value) => Ok(value),
194 }
195 }
196}
197
198#[derive(Debug, Default)]
200struct QueryManager<T> {
201 pending_queries: BTreeMap<u32, Promise<T>>,
203 query_count: u32,
205 active_query_count: u32,
207}
208
209impl<T> QueryManager<T> {
210 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
211 let id = self.query_count;
212 self.pending_queries.insert(id, Promise::Pending(receiver));
213 self.query_count = self
214 .query_count
215 .checked_add(1)
216 .ok_or(ArithmeticError::Overflow)?;
217 self.active_query_count = self
218 .active_query_count
219 .checked_add(1)
220 .ok_or(ArithmeticError::Overflow)?;
221 Ok(id)
222 }
223
224 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
225 let promise = self
226 .pending_queries
227 .remove(&id)
228 .ok_or(ExecutionError::InvalidPromise)?;
229 let value = promise.read()?;
230 self.active_query_count -= 1;
231 Ok(value)
232 }
233
234 fn force_all(&mut self) -> Result<(), ExecutionError> {
235 for promise in self.pending_queries.values_mut() {
236 promise.force()?;
237 }
238 Ok(())
239 }
240}
241
242type Keys = Vec<Vec<u8>>;
243type Value = Vec<u8>;
244type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
245
246#[derive(Debug, Default)]
247struct ViewUserState {
248 contains_key_queries: QueryManager<bool>,
250 contains_keys_queries: QueryManager<Vec<bool>>,
252 read_value_queries: QueryManager<Option<Value>>,
254 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
256 find_keys_queries: QueryManager<Keys>,
258 find_key_values_queries: QueryManager<KeyValues>,
260}
261
262impl ViewUserState {
263 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
264 self.contains_key_queries.force_all()?;
265 self.contains_keys_queries.force_all()?;
266 self.read_value_queries.force_all()?;
267 self.read_multi_values_queries.force_all()?;
268 self.find_keys_queries.force_all()?;
269 self.find_key_values_queries.force_all()?;
270 Ok(())
271 }
272}
273
274impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
275 type Target = SyncRuntimeHandle<UserInstance>;
276
277 fn deref(&self) -> &Self::Target {
278 self.0.as_ref().expect(
279 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
280 )
281 }
282}
283
284impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
285 fn deref_mut(&mut self) -> &mut Self::Target {
286 self.0.as_mut().expect(
287 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288 )
289 }
290}
291
292impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
293 fn drop(&mut self) {
294 if let Some(handle) = self.0.take() {
297 handle.inner().loaded_applications.clear();
298 }
299 }
300}
301
302impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
303 #[expect(clippy::too_many_arguments)]
304 fn new(
305 chain_id: ChainId,
306 height: BlockHeight,
307 round: Option<u32>,
308 executing_message: Option<ExecutingMessage>,
309 execution_state_sender: ExecutionStateSender,
310 deadline: Option<Instant>,
311 refund_grant_to: Option<Account>,
312 resource_controller: ResourceController,
313 user_context: UserInstance::UserContext,
314 ) -> Self {
315 Self {
316 chain_id,
317 height,
318 round,
319 executing_message,
320 execution_state_sender,
321 is_finalizing: false,
322 applications_to_finalize: Vec::new(),
323 loaded_applications: HashMap::new(),
324 call_stack: Vec::new(),
325 active_applications: HashSet::new(),
326 view_user_states: BTreeMap::new(),
327 deadline,
328 refund_grant_to,
329 resource_controller,
330 scheduled_operations: Vec::new(),
331 user_context,
332 }
333 }
334
335 fn current_application(&self) -> &ApplicationStatus {
343 self.call_stack
344 .last()
345 .expect("Call stack is unexpectedly empty")
346 }
347
348 fn push_application(&mut self, status: ApplicationStatus) {
352 self.active_applications.insert(status.id);
353 self.call_stack.push(status);
354 }
355
356 fn pop_application(&mut self) -> ApplicationStatus {
364 let status = self
365 .call_stack
366 .pop()
367 .expect("Can't remove application from empty call stack");
368 assert!(self.active_applications.remove(&status.id));
369 status
370 }
371
372 fn check_for_reentrancy(
376 &mut self,
377 application_id: ApplicationId,
378 ) -> Result<(), ExecutionError> {
379 ensure!(
380 !self.active_applications.contains(&application_id),
381 ExecutionError::ReentrantCall(application_id)
382 );
383 Ok(())
384 }
385}
386
387impl SyncRuntimeInternal<UserContractInstance> {
388 fn load_contract_instance(
390 &mut self,
391 this: SyncRuntimeHandle<UserContractInstance>,
392 id: ApplicationId,
393 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
394 match self.loaded_applications.entry(id) {
395 #[cfg(web)]
397 hash_map::Entry::Vacant(_) => {
398 drop(this);
399 Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
400 id,
401 )))
402 }
403 #[cfg(not(web))]
404 hash_map::Entry::Vacant(entry) => {
405 let (code, description) = self
406 .execution_state_sender
407 .send_request(move |callback| ExecutionRequest::LoadContract { id, callback })?
408 .recv_response()?;
409
410 let instance = code.instantiate(this)?;
411
412 self.applications_to_finalize.push(id);
413 Ok(entry
414 .insert(LoadedApplication::new(instance, description))
415 .clone())
416 }
417 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
418 }
419 }
420
421 fn prepare_for_call(
423 &mut self,
424 this: ContractSyncRuntimeHandle,
425 authenticated: bool,
426 callee_id: ApplicationId,
427 ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
428 self.check_for_reentrancy(callee_id)?;
429
430 ensure!(
431 !self.is_finalizing,
432 ExecutionError::CrossApplicationCallInFinalize {
433 caller_id: Box::new(self.current_application().id),
434 callee_id: Box::new(callee_id),
435 }
436 );
437
438 let application = self.load_contract_instance(this, callee_id)?;
440
441 let caller = self.current_application();
442 let caller_id = caller.id;
443 let caller_signer = caller.signer;
444 let authenticated_signer = match caller_signer {
446 Some(signer) if authenticated => Some(signer),
447 _ => None,
448 };
449 let authenticated_caller_id = authenticated.then_some(caller_id);
450 self.push_application(ApplicationStatus {
451 caller_id: authenticated_caller_id,
452 id: callee_id,
453 description: application.description,
454 signer: authenticated_signer,
456 });
457 Ok(application.instance)
458 }
459
460 fn finish_call(&mut self) {
462 self.pop_application();
463 }
464
465 fn run_service_oracle_query(
467 &mut self,
468 application_id: ApplicationId,
469 query: Vec<u8>,
470 ) -> Result<Vec<u8>, ExecutionError> {
471 let timeout = self
472 .resource_controller
473 .remaining_service_oracle_execution_time()?;
474 let execution_start = Instant::now();
475 let deadline = Some(execution_start + timeout);
476 let response = self
477 .execution_state_sender
478 .send_request(|callback| ExecutionRequest::QueryServiceOracle {
479 deadline,
480 application_id,
481 next_block_height: self.height,
482 query,
483 callback,
484 })?
485 .recv_response()?;
486
487 self.resource_controller
488 .track_service_oracle_execution(execution_start.elapsed())?;
489 self.resource_controller
490 .track_service_oracle_response(response.len())?;
491
492 Ok(response)
493 }
494}
495
496impl SyncRuntimeInternal<UserServiceInstance> {
497 fn load_service_instance(
499 &mut self,
500 this: ServiceSyncRuntimeHandle,
501 id: ApplicationId,
502 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
503 match self.loaded_applications.entry(id) {
504 #[cfg(web)]
506 hash_map::Entry::Vacant(_) => {
507 drop(this);
508 Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
509 id,
510 )))
511 }
512 #[cfg(not(web))]
513 hash_map::Entry::Vacant(entry) => {
514 let (code, description) = self
515 .execution_state_sender
516 .send_request(move |callback| ExecutionRequest::LoadService { id, callback })?
517 .recv_response()?;
518
519 let instance = code.instantiate(this)?;
520 Ok(entry
521 .insert(LoadedApplication::new(instance, description))
522 .clone())
523 }
524 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
525 }
526 }
527}
528
529impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
530 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
531 let handle = self.0.take().expect(
532 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
533 );
534 let runtime = Arc::into_inner(handle.0)?
535 .into_inner()
536 .expect("`SyncRuntime` should run in a single thread which should not panic");
537 Some(runtime)
538 }
539}
540
541impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
542 for SyncRuntimeHandle<UserInstance>
543{
544 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
545 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
546 }
547}
548
549impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
550 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
551 self.0
552 .try_lock()
553 .expect("Synchronous runtimes run on a single execution thread")
554 }
555}
556
557impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
558where
559 Self: ContractOrServiceRuntime,
560{
561 type Read = ();
562 type ReadValueBytes = u32;
563 type ContainsKey = u32;
564 type ContainsKeys = u32;
565 type ReadMultiValuesBytes = u32;
566 type FindKeysByPrefix = u32;
567 type FindKeyValuesByPrefix = u32;
568
569 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
570 let mut this = self.inner();
571 let chain_id = this.chain_id;
572 this.resource_controller.track_runtime_chain_id()?;
573 Ok(chain_id)
574 }
575
576 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
577 let mut this = self.inner();
578 let height = this.height;
579 this.resource_controller.track_runtime_block_height()?;
580 Ok(height)
581 }
582
583 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
584 let mut this = self.inner();
585 let application_id = this.current_application().id;
586 this.resource_controller.track_runtime_application_id()?;
587 Ok(application_id)
588 }
589
590 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
591 let mut this = self.inner();
592 let application_creator_chain_id = this.current_application().description.creator_chain_id;
593 this.resource_controller.track_runtime_application_id()?;
594 Ok(application_creator_chain_id)
595 }
596
597 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
598 let mut this = self.inner();
599 let parameters = this.current_application().description.parameters.clone();
600 this.resource_controller
601 .track_runtime_application_parameters(¶meters)?;
602 Ok(parameters)
603 }
604
605 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
606 let mut this = self.inner();
607 let timestamp = this
608 .execution_state_sender
609 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
610 .recv_response()?;
611 this.resource_controller.track_runtime_timestamp()?;
612 Ok(timestamp)
613 }
614
615 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
616 let mut this = self.inner();
617 let balance = this
618 .execution_state_sender
619 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
620 .recv_response()?;
621 this.resource_controller.track_runtime_balance()?;
622 Ok(balance)
623 }
624
625 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
626 let mut this = self.inner();
627 let balance = this
628 .execution_state_sender
629 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
630 .recv_response()?;
631 this.resource_controller.track_runtime_balance()?;
632 Ok(balance)
633 }
634
635 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
636 let mut this = self.inner();
637 let owner_balances = this
638 .execution_state_sender
639 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
640 .recv_response()?;
641 this.resource_controller
642 .track_runtime_owner_balances(&owner_balances)?;
643 Ok(owner_balances)
644 }
645
646 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
647 let mut this = self.inner();
648 let owners = this
649 .execution_state_sender
650 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
651 .recv_response()?;
652 this.resource_controller.track_runtime_owners(&owners)?;
653 Ok(owners)
654 }
655
656 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
657 let mut this = self.inner();
658 let chain_ownership = this
659 .execution_state_sender
660 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
661 .recv_response()?;
662 this.resource_controller
663 .track_runtime_chain_ownership(&chain_ownership)?;
664 Ok(chain_ownership)
665 }
666
667 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
668 let mut this = self.inner();
669 let id = this.current_application().id;
670 this.resource_controller.track_read_operation()?;
671 let receiver = this
672 .execution_state_sender
673 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
674 let state = this.view_user_states.entry(id).or_default();
675 state.contains_key_queries.register(receiver)
676 }
677
678 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
679 let mut this = self.inner();
680 let id = this.current_application().id;
681 let state = this.view_user_states.entry(id).or_default();
682 let value = state.contains_key_queries.wait(*promise)?;
683 Ok(value)
684 }
685
686 fn contains_keys_new(
687 &mut self,
688 keys: Vec<Vec<u8>>,
689 ) -> Result<Self::ContainsKeys, ExecutionError> {
690 let mut this = self.inner();
691 let id = this.current_application().id;
692 this.resource_controller.track_read_operation()?;
693 let receiver = this
694 .execution_state_sender
695 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
696 let state = this.view_user_states.entry(id).or_default();
697 state.contains_keys_queries.register(receiver)
698 }
699
700 fn contains_keys_wait(
701 &mut self,
702 promise: &Self::ContainsKeys,
703 ) -> Result<Vec<bool>, ExecutionError> {
704 let mut this = self.inner();
705 let id = this.current_application().id;
706 let state = this.view_user_states.entry(id).or_default();
707 let value = state.contains_keys_queries.wait(*promise)?;
708 Ok(value)
709 }
710
711 fn read_multi_values_bytes_new(
712 &mut self,
713 keys: Vec<Vec<u8>>,
714 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
715 let mut this = self.inner();
716 let id = this.current_application().id;
717 this.resource_controller.track_read_operation()?;
718 let receiver = this.execution_state_sender.send_request(move |callback| {
719 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
720 })?;
721 let state = this.view_user_states.entry(id).or_default();
722 state.read_multi_values_queries.register(receiver)
723 }
724
725 fn read_multi_values_bytes_wait(
726 &mut self,
727 promise: &Self::ReadMultiValuesBytes,
728 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
729 let mut this = self.inner();
730 let id = this.current_application().id;
731 let state = this.view_user_states.entry(id).or_default();
732 let values = state.read_multi_values_queries.wait(*promise)?;
733 for value in &values {
734 if let Some(value) = &value {
735 this.resource_controller
736 .track_bytes_read(value.len() as u64)?;
737 }
738 }
739 Ok(values)
740 }
741
742 fn read_value_bytes_new(
743 &mut self,
744 key: Vec<u8>,
745 ) -> Result<Self::ReadValueBytes, ExecutionError> {
746 let mut this = self.inner();
747 let id = this.current_application().id;
748 this.resource_controller.track_read_operation()?;
749 let receiver = this
750 .execution_state_sender
751 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
752 let state = this.view_user_states.entry(id).or_default();
753 state.read_value_queries.register(receiver)
754 }
755
756 fn read_value_bytes_wait(
757 &mut self,
758 promise: &Self::ReadValueBytes,
759 ) -> Result<Option<Vec<u8>>, ExecutionError> {
760 let mut this = self.inner();
761 let id = this.current_application().id;
762 let value = {
763 let state = this.view_user_states.entry(id).or_default();
764 state.read_value_queries.wait(*promise)?
765 };
766 if let Some(value) = &value {
767 this.resource_controller
768 .track_bytes_read(value.len() as u64)?;
769 }
770 Ok(value)
771 }
772
773 fn find_keys_by_prefix_new(
774 &mut self,
775 key_prefix: Vec<u8>,
776 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
777 let mut this = self.inner();
778 let id = this.current_application().id;
779 this.resource_controller.track_read_operation()?;
780 let receiver = this.execution_state_sender.send_request(move |callback| {
781 ExecutionRequest::FindKeysByPrefix {
782 id,
783 key_prefix,
784 callback,
785 }
786 })?;
787 let state = this.view_user_states.entry(id).or_default();
788 state.find_keys_queries.register(receiver)
789 }
790
791 fn find_keys_by_prefix_wait(
792 &mut self,
793 promise: &Self::FindKeysByPrefix,
794 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
795 let mut this = self.inner();
796 let id = this.current_application().id;
797 let keys = {
798 let state = this.view_user_states.entry(id).or_default();
799 state.find_keys_queries.wait(*promise)?
800 };
801 let mut read_size = 0;
802 for key in &keys {
803 read_size += key.len();
804 }
805 this.resource_controller
806 .track_bytes_read(read_size as u64)?;
807 Ok(keys)
808 }
809
810 fn find_key_values_by_prefix_new(
811 &mut self,
812 key_prefix: Vec<u8>,
813 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
814 let mut this = self.inner();
815 let id = this.current_application().id;
816 this.resource_controller.track_read_operation()?;
817 let receiver = this.execution_state_sender.send_request(move |callback| {
818 ExecutionRequest::FindKeyValuesByPrefix {
819 id,
820 key_prefix,
821 callback,
822 }
823 })?;
824 let state = this.view_user_states.entry(id).or_default();
825 state.find_key_values_queries.register(receiver)
826 }
827
828 fn find_key_values_by_prefix_wait(
829 &mut self,
830 promise: &Self::FindKeyValuesByPrefix,
831 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
832 let mut this = self.inner();
833 let id = this.current_application().id;
834 let state = this.view_user_states.entry(id).or_default();
835 let key_values = state.find_key_values_queries.wait(*promise)?;
836 let mut read_size = 0;
837 for (key, value) in &key_values {
838 read_size += key.len() + value.len();
839 }
840 this.resource_controller
841 .track_bytes_read(read_size as u64)?;
842 Ok(key_values)
843 }
844
845 fn perform_http_request(
846 &mut self,
847 request: http::Request,
848 ) -> Result<http::Response, ExecutionError> {
849 let mut this = self.inner();
850 let app_permissions = this
851 .execution_state_sender
852 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
853 .recv_response()?;
854
855 let app_id = this.current_application().id;
856 ensure!(
857 app_permissions.can_make_http_requests(&app_id),
858 ExecutionError::UnauthorizedApplication(app_id)
859 );
860
861 this.resource_controller.track_http_request()?;
862
863 this.execution_state_sender
864 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
865 request,
866 http_responses_are_oracle_responses:
867 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
868 callback,
869 })?
870 .recv_response()
871 }
872
873 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
874 let this = self.inner();
875 this.execution_state_sender
876 .send_request(|callback| ExecutionRequest::AssertBefore {
877 timestamp,
878 callback,
879 })?
880 .recv_response()?
881 }
882
883 fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
884 let this = self.inner();
885 let blob_id = hash.into();
886 let content = this
887 .execution_state_sender
888 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
889 .recv_response()?;
890 Ok(content.into_vec_or_clone())
891 }
892
893 fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
894 let this = self.inner();
895 let blob_id = hash.into();
896 this.execution_state_sender
897 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
898 .recv_response()
899 }
900}
901
902trait ContractOrServiceRuntime {
905 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
911}
912
913impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
914 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
915}
916
917impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
918 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
919}
920
921impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
922 fn clone(&self) -> Self {
923 SyncRuntimeHandle(self.0.clone())
924 }
925}
926
927impl ContractSyncRuntime {
928 pub(crate) fn new(
929 execution_state_sender: ExecutionStateSender,
930 chain_id: ChainId,
931 refund_grant_to: Option<Account>,
932 resource_controller: ResourceController,
933 action: &UserAction,
934 ) -> Self {
935 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
936 SyncRuntimeInternal::new(
937 chain_id,
938 action.height(),
939 action.round(),
940 if let UserAction::Message(context, _) = action {
941 Some(context.into())
942 } else {
943 None
944 },
945 execution_state_sender,
946 None,
947 refund_grant_to,
948 resource_controller,
949 action.timestamp(),
950 ),
951 )))
952 }
953
954 pub(crate) fn preload_contract(
955 &self,
956 id: ApplicationId,
957 code: UserContractCode,
958 description: ApplicationDescription,
959 ) -> Result<(), ExecutionError> {
960 let this = self
961 .0
962 .as_ref()
963 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
964 let runtime_handle = this.clone();
965 let mut this_guard = this.inner();
966
967 if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
968 entry.insert(LoadedApplication::new(
969 code.instantiate(runtime_handle)?,
970 description,
971 ));
972 this_guard.applications_to_finalize.push(id);
973 }
974
975 Ok(())
976 }
977
978 pub(crate) fn run_action(
980 mut self,
981 application_id: ApplicationId,
982 chain_id: ChainId,
983 action: UserAction,
984 ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
985 let result = self
986 .deref_mut()
987 .run_action(application_id, chain_id, action)?;
988 let runtime = self
989 .into_inner()
990 .expect("Runtime clones should have been freed by now");
991
992 Ok((result, runtime.resource_controller))
993 }
994}
995
996impl ContractSyncRuntimeHandle {
997 fn run_action(
998 &mut self,
999 application_id: ApplicationId,
1000 chain_id: ChainId,
1001 action: UserAction,
1002 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1003 let finalize_context = FinalizeContext {
1004 authenticated_signer: action.signer(),
1005 chain_id,
1006 height: action.height(),
1007 round: action.round(),
1008 };
1009
1010 {
1011 let runtime = self.inner();
1012 assert_eq!(runtime.chain_id, chain_id);
1013 assert_eq!(runtime.height, action.height());
1014 }
1015
1016 let signer = action.signer();
1017 let closure = move |code: &mut UserContractInstance| match action {
1018 UserAction::Instantiate(_context, argument) => {
1019 code.instantiate(argument).map(|()| None)
1020 }
1021 UserAction::Operation(_context, operation) => {
1022 code.execute_operation(operation).map(Option::Some)
1023 }
1024 UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1025 UserAction::ProcessStreams(_context, updates) => {
1026 code.process_streams(updates).map(|()| None)
1027 }
1028 };
1029
1030 let result = self.execute(application_id, signer, closure)?;
1031 self.finalize(finalize_context)?;
1032 Ok(result)
1033 }
1034
1035 fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1037 let applications = mem::take(&mut self.inner().applications_to_finalize)
1038 .into_iter()
1039 .rev();
1040
1041 self.inner().is_finalizing = true;
1042
1043 for application in applications {
1044 self.execute(application, context.authenticated_signer, |contract| {
1045 contract.finalize().map(|_| None)
1046 })?;
1047 self.inner().loaded_applications.remove(&application);
1048 }
1049
1050 Ok(())
1051 }
1052
1053 fn execute(
1055 &mut self,
1056 application_id: ApplicationId,
1057 signer: Option<AccountOwner>,
1058 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1059 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1060 let contract = {
1061 let mut runtime = self.inner();
1062 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1063
1064 let status = ApplicationStatus {
1065 caller_id: None,
1066 id: application_id,
1067 description: application.description.clone(),
1068 signer,
1069 };
1070
1071 runtime.push_application(status);
1072
1073 application
1074 };
1075
1076 let result = closure(
1077 &mut contract
1078 .instance
1079 .try_lock()
1080 .expect("Application should not be already executing"),
1081 )?;
1082
1083 let mut runtime = self.inner();
1084 let application_status = runtime.pop_application();
1085 assert_eq!(application_status.caller_id, None);
1086 assert_eq!(application_status.id, application_id);
1087 assert_eq!(application_status.description, contract.description);
1088 assert_eq!(application_status.signer, signer);
1089 assert!(runtime.call_stack.is_empty());
1090
1091 Ok(result)
1092 }
1093}
1094
1095impl ContractRuntime for ContractSyncRuntimeHandle {
1096 fn authenticated_signer(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1097 let this = self.inner();
1098 Ok(this.current_application().signer)
1099 }
1100
1101 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1102 Ok(self
1103 .inner()
1104 .executing_message
1105 .map(|metadata| metadata.is_bouncing))
1106 }
1107
1108 fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1109 Ok(self
1110 .inner()
1111 .executing_message
1112 .map(|metadata| metadata.origin))
1113 }
1114
1115 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1116 let this = self.inner();
1117 if this.call_stack.len() <= 1 {
1118 return Ok(None);
1119 }
1120 Ok(this.current_application().caller_id)
1121 }
1122
1123 fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1124 Ok(match vm_runtime {
1125 VmRuntime::Wasm => {
1126 self.inner()
1127 .resource_controller
1128 .policy()
1129 .maximum_wasm_fuel_per_block
1130 }
1131 VmRuntime::Evm => {
1132 self.inner()
1133 .resource_controller
1134 .policy()
1135 .maximum_evm_fuel_per_block
1136 }
1137 })
1138 }
1139
1140 fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1141 Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1142 }
1143
1144 fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1145 let mut this = self.inner();
1146 this.resource_controller.track_fuel(fuel, vm_runtime)
1147 }
1148
1149 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1150 let mut this = self.inner();
1151 let application = this.current_application();
1152 let application_id = application.id;
1153 let authenticated_signer = application.signer;
1154 let mut refund_grant_to = this.refund_grant_to;
1155
1156 let grant = this
1157 .resource_controller
1158 .policy()
1159 .total_price(&message.grant)?;
1160 if grant.is_zero() {
1161 refund_grant_to = None;
1162 } else {
1163 this.resource_controller.track_grant(grant)?;
1164 }
1165 let kind = if message.is_tracked {
1166 MessageKind::Tracked
1167 } else {
1168 MessageKind::Simple
1169 };
1170
1171 this.execution_state_sender
1172 .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1173 message: OutgoingMessage {
1174 destination: message.destination,
1175 authenticated_signer,
1176 refund_grant_to,
1177 grant,
1178 kind,
1179 message: Message::User {
1180 application_id,
1181 bytes: message.message,
1182 },
1183 },
1184 callback,
1185 })?
1186 .recv_response()?;
1187
1188 Ok(())
1189 }
1190
1191 fn transfer(
1192 &mut self,
1193 source: AccountOwner,
1194 destination: Account,
1195 amount: Amount,
1196 ) -> Result<(), ExecutionError> {
1197 let this = self.inner();
1198 let current_application = this.current_application();
1199 let application_id = current_application.id;
1200 let signer = current_application.signer;
1201
1202 this.execution_state_sender
1203 .send_request(|callback| ExecutionRequest::Transfer {
1204 source,
1205 destination,
1206 amount,
1207 signer,
1208 application_id,
1209 callback,
1210 })?
1211 .recv_response()?;
1212 Ok(())
1213 }
1214
1215 fn claim(
1216 &mut self,
1217 source: Account,
1218 destination: Account,
1219 amount: Amount,
1220 ) -> Result<(), ExecutionError> {
1221 let this = self.inner();
1222 let current_application = this.current_application();
1223 let application_id = current_application.id;
1224 let signer = current_application.signer;
1225
1226 this.execution_state_sender
1227 .send_request(|callback| ExecutionRequest::Claim {
1228 source,
1229 destination,
1230 amount,
1231 signer,
1232 application_id,
1233 callback,
1234 })?
1235 .recv_response()?;
1236 Ok(())
1237 }
1238
1239 fn try_call_application(
1240 &mut self,
1241 authenticated: bool,
1242 callee_id: ApplicationId,
1243 argument: Vec<u8>,
1244 ) -> Result<Vec<u8>, ExecutionError> {
1245 let contract = self
1246 .inner()
1247 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1248
1249 let value = contract
1250 .try_lock()
1251 .expect("Applications should not have reentrant calls")
1252 .execute_operation(argument)?;
1253
1254 self.inner().finish_call();
1255
1256 Ok(value)
1257 }
1258
1259 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1260 let mut this = self.inner();
1261 ensure!(
1262 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1263 ExecutionError::StreamNameTooLong
1264 );
1265 let application_id = GenericApplicationId::User(this.current_application().id);
1266 let stream_id = StreamId {
1267 stream_name,
1268 application_id,
1269 };
1270 let value_len = value.len() as u64;
1271 let index = this
1272 .execution_state_sender
1273 .send_request(|callback| ExecutionRequest::Emit {
1274 stream_id,
1275 value,
1276 callback,
1277 })?
1278 .recv_response()?;
1279 this.resource_controller.track_bytes_written(value_len)?;
1281 Ok(index)
1282 }
1283
1284 fn read_event(
1285 &mut self,
1286 chain_id: ChainId,
1287 stream_name: StreamName,
1288 index: u32,
1289 ) -> Result<Vec<u8>, ExecutionError> {
1290 let mut this = self.inner();
1291 ensure!(
1292 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1293 ExecutionError::StreamNameTooLong
1294 );
1295 let application_id = GenericApplicationId::User(this.current_application().id);
1296 let stream_id = StreamId {
1297 stream_name,
1298 application_id,
1299 };
1300 let event_id = EventId {
1301 stream_id,
1302 index,
1303 chain_id,
1304 };
1305 let event = this
1306 .execution_state_sender
1307 .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1308 .recv_response()?;
1309 this.resource_controller
1311 .track_bytes_read(event.len() as u64)?;
1312 Ok(event)
1313 }
1314
1315 fn subscribe_to_events(
1316 &mut self,
1317 chain_id: ChainId,
1318 application_id: ApplicationId,
1319 stream_name: StreamName,
1320 ) -> Result<(), ExecutionError> {
1321 let this = self.inner();
1322 ensure!(
1323 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1324 ExecutionError::StreamNameTooLong
1325 );
1326 let stream_id = StreamId {
1327 stream_name,
1328 application_id: application_id.into(),
1329 };
1330 let subscriber_app_id = this.current_application().id;
1331 this.execution_state_sender
1332 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1333 chain_id,
1334 stream_id,
1335 subscriber_app_id,
1336 callback,
1337 })?
1338 .recv_response()?;
1339 Ok(())
1340 }
1341
1342 fn unsubscribe_from_events(
1343 &mut self,
1344 chain_id: ChainId,
1345 application_id: ApplicationId,
1346 stream_name: StreamName,
1347 ) -> Result<(), ExecutionError> {
1348 let this = self.inner();
1349 ensure!(
1350 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1351 ExecutionError::StreamNameTooLong
1352 );
1353 let stream_id = StreamId {
1354 stream_name,
1355 application_id: application_id.into(),
1356 };
1357 let subscriber_app_id = this.current_application().id;
1358 this.execution_state_sender
1359 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1360 chain_id,
1361 stream_id,
1362 subscriber_app_id,
1363 callback,
1364 })?
1365 .recv_response()?;
1366 Ok(())
1367 }
1368
1369 fn query_service(
1370 &mut self,
1371 application_id: ApplicationId,
1372 query: Vec<u8>,
1373 ) -> Result<Vec<u8>, ExecutionError> {
1374 let mut this = self.inner();
1375
1376 let app_permissions = this
1377 .execution_state_sender
1378 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1379 .recv_response()?;
1380
1381 let app_id = this.current_application().id;
1382 ensure!(
1383 app_permissions.can_call_services(&app_id),
1384 ExecutionError::UnauthorizedApplication(app_id)
1385 );
1386
1387 this.resource_controller.track_service_oracle_call()?;
1388
1389 this.run_service_oracle_query(application_id, query)
1390 }
1391
1392 fn open_chain(
1393 &mut self,
1394 ownership: ChainOwnership,
1395 application_permissions: ApplicationPermissions,
1396 balance: Amount,
1397 ) -> Result<ChainId, ExecutionError> {
1398 let parent_id = self.inner().chain_id;
1399 let block_height = self.block_height()?;
1400
1401 let timestamp = self.inner().user_context;
1402
1403 let chain_id = self
1404 .inner()
1405 .execution_state_sender
1406 .send_request(|callback| ExecutionRequest::OpenChain {
1407 ownership,
1408 balance,
1409 parent_id,
1410 block_height,
1411 timestamp,
1412 application_permissions,
1413 callback,
1414 })?
1415 .recv_response()?;
1416
1417 Ok(chain_id)
1418 }
1419
1420 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1421 let this = self.inner();
1422 let application_id = this.current_application().id;
1423 this.execution_state_sender
1424 .send_request(|callback| ExecutionRequest::CloseChain {
1425 application_id,
1426 callback,
1427 })?
1428 .recv_response()?
1429 }
1430
1431 fn change_application_permissions(
1432 &mut self,
1433 application_permissions: ApplicationPermissions,
1434 ) -> Result<(), ExecutionError> {
1435 let this = self.inner();
1436 let application_id = this.current_application().id;
1437 this.execution_state_sender
1438 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1439 application_id,
1440 application_permissions,
1441 callback,
1442 })?
1443 .recv_response()?
1444 }
1445
1446 fn create_application(
1447 &mut self,
1448 module_id: ModuleId,
1449 parameters: Vec<u8>,
1450 argument: Vec<u8>,
1451 required_application_ids: Vec<ApplicationId>,
1452 ) -> Result<ApplicationId, ExecutionError> {
1453 let chain_id = self.inner().chain_id;
1454 let block_height = self.block_height()?;
1455
1456 let CreateApplicationResult { app_id } = self
1457 .inner()
1458 .execution_state_sender
1459 .send_request(move |callback| ExecutionRequest::CreateApplication {
1460 chain_id,
1461 block_height,
1462 module_id,
1463 parameters,
1464 required_application_ids,
1465 callback,
1466 })?
1467 .recv_response()?;
1468
1469 let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1470
1471 contract
1472 .try_lock()
1473 .expect("Applications should not have reentrant calls")
1474 .instantiate(argument)?;
1475
1476 self.inner().finish_call();
1477
1478 Ok(app_id)
1479 }
1480
1481 fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1482 let blob = Blob::new_data(bytes);
1483 let blob_id = blob.id();
1484 let this = self.inner();
1485 this.execution_state_sender
1486 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1487 .recv_response()?;
1488 Ok(DataBlobHash(blob_id.hash))
1489 }
1490
1491 fn publish_module(
1492 &mut self,
1493 contract: Bytecode,
1494 service: Bytecode,
1495 vm_runtime: VmRuntime,
1496 ) -> Result<ModuleId, ExecutionError> {
1497 let (blobs, module_id) =
1498 crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1499 let this = self.inner();
1500 for blob in blobs {
1501 this.execution_state_sender
1502 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1503 .recv_response()?;
1504 }
1505 Ok(module_id)
1506 }
1507
1508 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1509 let this = self.inner();
1510 let round = this.round;
1511 this.execution_state_sender
1512 .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1513 .recv_response()
1514 }
1515
1516 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1517 let mut this = self.inner();
1518 let id = this.current_application().id;
1519 let state = this.view_user_states.entry(id).or_default();
1520 state.force_all_pending_queries()?;
1521 this.resource_controller.track_write_operations(
1522 batch
1523 .num_operations()
1524 .try_into()
1525 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1526 )?;
1527 this.resource_controller
1528 .track_bytes_written(batch.size() as u64)?;
1529 this.execution_state_sender
1530 .send_request(|callback| ExecutionRequest::WriteBatch {
1531 id,
1532 batch,
1533 callback,
1534 })?
1535 .recv_response()?;
1536 Ok(())
1537 }
1538}
1539
1540impl ServiceSyncRuntime {
1541 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1543 Self::new_with_deadline(execution_state_sender, context, None)
1544 }
1545
1546 pub fn new_with_deadline(
1548 execution_state_sender: ExecutionStateSender,
1549 context: QueryContext,
1550 deadline: Option<Instant>,
1551 ) -> Self {
1552 let runtime = SyncRuntime(Some(
1553 SyncRuntimeInternal::new(
1554 context.chain_id,
1555 context.next_block_height,
1556 None,
1557 None,
1558 execution_state_sender,
1559 deadline,
1560 None,
1561 ResourceController::default(),
1562 (),
1563 )
1564 .into(),
1565 ));
1566
1567 ServiceSyncRuntime {
1568 runtime,
1569 current_context: context,
1570 }
1571 }
1572
1573 pub(crate) fn preload_service(
1575 &self,
1576 id: ApplicationId,
1577 code: UserServiceCode,
1578 description: ApplicationDescription,
1579 ) -> Result<(), ExecutionError> {
1580 let this = self
1581 .runtime
1582 .0
1583 .as_ref()
1584 .expect("services shouldn't be preloaded while the runtime is being dropped");
1585 let runtime_handle = this.clone();
1586 let mut this_guard = this.inner();
1587
1588 if let hash_map::Entry::Vacant(entry) = this_guard.loaded_applications.entry(id) {
1589 entry.insert(LoadedApplication::new(
1590 code.instantiate(runtime_handle)?,
1591 description,
1592 ));
1593 this_guard.applications_to_finalize.push(id);
1594 }
1595
1596 Ok(())
1597 }
1598
1599 pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1601 while let Ok(request) = incoming_requests.recv() {
1602 let ServiceRuntimeRequest::Query {
1603 application_id,
1604 context,
1605 query,
1606 callback,
1607 } = request;
1608
1609 let result = self
1610 .prepare_for_query(context)
1611 .and_then(|()| self.run_query(application_id, query));
1612
1613 if let Err(err) = callback.send(result) {
1614 tracing::debug!(%err, "Receiver for query result has been dropped");
1615 }
1616 }
1617 }
1618
1619 pub(crate) fn prepare_for_query(
1621 &mut self,
1622 new_context: QueryContext,
1623 ) -> Result<(), ExecutionError> {
1624 let expected_context = QueryContext {
1625 local_time: new_context.local_time,
1626 ..self.current_context
1627 };
1628
1629 if new_context != expected_context {
1630 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1631 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1632 } else {
1633 self.handle_mut()
1634 .inner()
1635 .execution_state_sender
1636 .send_request(|callback| ExecutionRequest::SetLocalTime {
1637 local_time: new_context.local_time,
1638 callback,
1639 })?
1640 .recv_response()?;
1641 }
1642 Ok(())
1643 }
1644
1645 pub(crate) fn run_query(
1647 &mut self,
1648 application_id: ApplicationId,
1649 query: Vec<u8>,
1650 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1651 let this = self.handle_mut();
1652 let response = this.try_query_application(application_id, query)?;
1653 let operations = mem::take(&mut this.inner().scheduled_operations);
1654
1655 Ok(QueryOutcome {
1656 response,
1657 operations,
1658 })
1659 }
1660
1661 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1663 self.runtime.0.as_mut().expect(
1664 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1665 )
1666 }
1667}
1668
1669impl ServiceRuntime for ServiceSyncRuntimeHandle {
1670 fn try_query_application(
1672 &mut self,
1673 queried_id: ApplicationId,
1674 argument: Vec<u8>,
1675 ) -> Result<Vec<u8>, ExecutionError> {
1676 let service = {
1677 let mut this = self.inner();
1678
1679 let application = this.load_service_instance(self.clone(), queried_id)?;
1681 this.push_application(ApplicationStatus {
1683 caller_id: None,
1684 id: queried_id,
1685 description: application.description,
1686 signer: None,
1687 });
1688 application.instance
1689 };
1690 let response = service
1691 .try_lock()
1692 .expect("Applications should not have reentrant calls")
1693 .handle_query(argument)?;
1694 self.inner().pop_application();
1695 Ok(response)
1696 }
1697
1698 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1699 let mut this = self.inner();
1700 let application_id = this.current_application().id;
1701
1702 this.scheduled_operations.push(Operation::User {
1703 application_id,
1704 bytes: operation,
1705 });
1706
1707 Ok(())
1708 }
1709
1710 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1711 if let Some(deadline) = self.inner().deadline {
1712 if Instant::now() >= deadline {
1713 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1714 }
1715 }
1716 Ok(())
1717 }
1718}
1719
1720pub enum ServiceRuntimeRequest {
1722 Query {
1723 application_id: ApplicationId,
1724 context: QueryContext,
1725 query: Vec<u8>,
1726 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1727 },
1728}
1729
1730#[derive(Clone, Copy, Debug)]
1732struct ExecutingMessage {
1733 is_bouncing: bool,
1734 origin: ChainId,
1735}
1736
1737impl From<&MessageContext> for ExecutingMessage {
1738 fn from(context: &MessageContext) -> Self {
1739 ExecutingMessage {
1740 is_bouncing: context.is_bouncing,
1741 origin: context.origin,
1742 }
1743 }
1744}
1745
1746pub fn create_bytecode_blobs_sync(
1748 contract: Bytecode,
1749 service: Bytecode,
1750 vm_runtime: VmRuntime,
1751) -> (Vec<Blob>, ModuleId) {
1752 match vm_runtime {
1753 VmRuntime::Wasm => {
1754 let compressed_contract = contract.compress();
1755 let compressed_service = service.compress();
1756 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1757 let service_blob = Blob::new_service_bytecode(compressed_service);
1758 let module_id =
1759 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1760 (vec![contract_blob, service_blob], module_id)
1761 }
1762 VmRuntime::Evm => {
1763 let compressed_contract = contract.compress();
1764 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1765 let module_id = ModuleId::new(
1766 evm_contract_blob.id().hash,
1767 evm_contract_blob.id().hash,
1768 vm_runtime,
1769 );
1770 (vec![evm_contract_blob], module_id)
1771 }
1772 }
1773}