1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15 SendMessageRequest, Timestamp,
16 },
17 ensure, http,
18 identifiers::{
19 Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20 },
21 ownership::ChainOwnership,
22 time::Instant,
23 vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29 execution::UserAction,
30 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31 resources::ResourceController,
32 system::CreateApplicationResult,
33 util::{ReceiverExt, UnboundedSenderExt},
34 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35 ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36 OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37 UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45 type UserContext;
46 type Code;
47}
48
49impl WithContext for UserContractInstance {
50 type UserContext = Timestamp;
51 type Code = UserContractCode;
52}
53
54impl WithContext for UserServiceInstance {
55 type UserContext = ();
56 type Code = UserServiceCode;
57}
58
59#[cfg(test)]
60impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
61 type UserContext = ();
62 type Code = ();
63}
64
65#[derive(Debug)]
66pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
67
68pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
69
70pub struct ServiceSyncRuntime {
71 runtime: SyncRuntime<UserServiceInstance>,
72 current_context: QueryContext,
73}
74
75#[derive(Debug)]
76pub struct SyncRuntimeHandle<UserInstance: WithContext>(
77 Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
78);
79
80pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
81pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
82
83#[derive(Debug)]
85pub struct SyncRuntimeInternal<UserInstance: WithContext> {
86 chain_id: ChainId,
88 height: BlockHeight,
91 round: Option<u32>,
93 #[debug(skip_if = Option::is_none)]
95 executing_message: Option<ExecutingMessage>,
96
97 execution_state_sender: ExecutionStateSender,
99
100 is_finalizing: bool,
104 applications_to_finalize: Vec<ApplicationId>,
106
107 preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
109 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
111 call_stack: Vec<ApplicationStatus>,
113 active_applications: HashSet<ApplicationId>,
115 scheduled_operations: Vec<Operation>,
117
118 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
120
121 deadline: Option<Instant>,
125
126 #[debug(skip_if = Option::is_none)]
128 refund_grant_to: Option<Account>,
129 resource_controller: ResourceController,
131 user_context: UserInstance::UserContext,
133 allow_application_logs: bool,
135}
136
137#[derive(Debug)]
139struct ApplicationStatus {
140 caller_id: Option<ApplicationId>,
142 id: ApplicationId,
144 description: ApplicationDescription,
146 signer: Option<AccountOwner>,
148}
149
150#[derive(Debug)]
152struct LoadedApplication<Instance> {
153 instance: Arc<Mutex<Instance>>,
154 description: ApplicationDescription,
155}
156
157impl<Instance> LoadedApplication<Instance> {
158 fn new(instance: Instance, description: ApplicationDescription) -> Self {
160 LoadedApplication {
161 instance: Arc::new(Mutex::new(instance)),
162 description,
163 }
164 }
165}
166
167impl<Instance> Clone for LoadedApplication<Instance> {
168 fn clone(&self) -> Self {
171 LoadedApplication {
172 instance: self.instance.clone(),
173 description: self.description.clone(),
174 }
175 }
176}
177
178#[derive(Debug)]
179enum Promise<T> {
180 Ready(T),
181 Pending(Receiver<T>),
182}
183
184impl<T> Promise<T> {
185 fn force(&mut self) -> Result<(), ExecutionError> {
186 if let Promise::Pending(receiver) = self {
187 let value = receiver
188 .recv_ref()
189 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
190 *self = Promise::Ready(value);
191 }
192 Ok(())
193 }
194
195 fn read(self) -> Result<T, ExecutionError> {
196 match self {
197 Promise::Pending(receiver) => {
198 let value = receiver.recv_response()?;
199 Ok(value)
200 }
201 Promise::Ready(value) => Ok(value),
202 }
203 }
204}
205
206#[derive(Debug, Default)]
208struct QueryManager<T> {
209 pending_queries: BTreeMap<u32, Promise<T>>,
211 query_count: u32,
213 active_query_count: u32,
215}
216
217impl<T> QueryManager<T> {
218 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
219 let id = self.query_count;
220 self.pending_queries.insert(id, Promise::Pending(receiver));
221 self.query_count = self
222 .query_count
223 .checked_add(1)
224 .ok_or(ArithmeticError::Overflow)?;
225 self.active_query_count = self
226 .active_query_count
227 .checked_add(1)
228 .ok_or(ArithmeticError::Overflow)?;
229 Ok(id)
230 }
231
232 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
233 let promise = self
234 .pending_queries
235 .remove(&id)
236 .ok_or(ExecutionError::InvalidPromise)?;
237 let value = promise.read()?;
238 self.active_query_count -= 1;
239 Ok(value)
240 }
241
242 fn force_all(&mut self) -> Result<(), ExecutionError> {
243 for promise in self.pending_queries.values_mut() {
244 promise.force()?;
245 }
246 Ok(())
247 }
248}
249
250type Keys = Vec<Vec<u8>>;
251type Value = Vec<u8>;
252type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
253
254#[derive(Debug, Default)]
255struct ViewUserState {
256 contains_key_queries: QueryManager<bool>,
258 contains_keys_queries: QueryManager<Vec<bool>>,
260 read_value_queries: QueryManager<Option<Value>>,
262 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
264 find_keys_queries: QueryManager<Keys>,
266 find_key_values_queries: QueryManager<KeyValues>,
268}
269
270impl ViewUserState {
271 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
272 self.contains_key_queries.force_all()?;
273 self.contains_keys_queries.force_all()?;
274 self.read_value_queries.force_all()?;
275 self.read_multi_values_queries.force_all()?;
276 self.find_keys_queries.force_all()?;
277 self.find_key_values_queries.force_all()?;
278 Ok(())
279 }
280}
281
282impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
283 type Target = SyncRuntimeHandle<UserInstance>;
284
285 fn deref(&self) -> &Self::Target {
286 self.0.as_ref().expect(
287 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288 )
289 }
290}
291
292impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
293 fn deref_mut(&mut self) -> &mut Self::Target {
294 self.0.as_mut().expect(
295 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
296 )
297 }
298}
299
300impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
301 fn drop(&mut self) {
302 if let Some(handle) = self.0.take() {
305 handle.inner().loaded_applications.clear();
306 }
307 }
308}
309
310impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
311 #[expect(clippy::too_many_arguments)]
312 fn new(
313 chain_id: ChainId,
314 height: BlockHeight,
315 round: Option<u32>,
316 executing_message: Option<ExecutingMessage>,
317 execution_state_sender: ExecutionStateSender,
318 deadline: Option<Instant>,
319 refund_grant_to: Option<Account>,
320 resource_controller: ResourceController,
321 user_context: UserInstance::UserContext,
322 allow_application_logs: bool,
323 ) -> Self {
324 Self {
325 chain_id,
326 height,
327 round,
328 executing_message,
329 execution_state_sender,
330 is_finalizing: false,
331 applications_to_finalize: Vec::new(),
332 preloaded_applications: HashMap::new(),
333 loaded_applications: HashMap::new(),
334 call_stack: Vec::new(),
335 active_applications: HashSet::new(),
336 view_user_states: BTreeMap::new(),
337 deadline,
338 refund_grant_to,
339 resource_controller,
340 scheduled_operations: Vec::new(),
341 user_context,
342 allow_application_logs,
343 }
344 }
345
346 fn current_application(&self) -> &ApplicationStatus {
354 self.call_stack
355 .last()
356 .expect("Call stack is unexpectedly empty")
357 }
358
359 fn push_application(&mut self, status: ApplicationStatus) {
363 self.active_applications.insert(status.id);
364 self.call_stack.push(status);
365 }
366
367 fn pop_application(&mut self) -> ApplicationStatus {
375 let status = self
376 .call_stack
377 .pop()
378 .expect("Can't remove application from empty call stack");
379 assert!(self.active_applications.remove(&status.id));
380 status
381 }
382
383 fn check_for_reentrancy(
387 &mut self,
388 application_id: ApplicationId,
389 ) -> Result<(), ExecutionError> {
390 ensure!(
391 !self.active_applications.contains(&application_id),
392 ExecutionError::ReentrantCall(application_id)
393 );
394 Ok(())
395 }
396}
397
398impl SyncRuntimeInternal<UserContractInstance> {
399 fn load_contract_instance(
401 &mut self,
402 this: SyncRuntimeHandle<UserContractInstance>,
403 id: ApplicationId,
404 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
405 match self.loaded_applications.entry(id) {
406 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
407
408 hash_map::Entry::Vacant(entry) => {
409 let (code, description) = match self.preloaded_applications.entry(id) {
412 #[cfg(web)]
414 hash_map::Entry::Vacant(_) => {
415 drop(this);
416 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
417 id,
418 )));
419 }
420 #[cfg(not(web))]
421 hash_map::Entry::Vacant(entry) => {
422 let (code, description) = self
423 .execution_state_sender
424 .send_request(move |callback| ExecutionRequest::LoadContract {
425 id,
426 callback,
427 })?
428 .recv_response()?;
429 entry.insert((code, description)).clone()
430 }
431 hash_map::Entry::Occupied(entry) => entry.get().clone(),
432 };
433 let instance = code.instantiate(this)?;
434
435 self.applications_to_finalize.push(id);
436 Ok(entry
437 .insert(LoadedApplication::new(instance, description))
438 .clone())
439 }
440 }
441 }
442
443 fn prepare_for_call(
445 &mut self,
446 this: ContractSyncRuntimeHandle,
447 authenticated: bool,
448 callee_id: ApplicationId,
449 ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
450 self.check_for_reentrancy(callee_id)?;
451
452 ensure!(
453 !self.is_finalizing,
454 ExecutionError::CrossApplicationCallInFinalize {
455 caller_id: Box::new(self.current_application().id),
456 callee_id: Box::new(callee_id),
457 }
458 );
459
460 let application = self.load_contract_instance(this, callee_id)?;
462
463 let caller = self.current_application();
464 let caller_id = caller.id;
465 let caller_signer = caller.signer;
466 let authenticated_owner = match caller_signer {
468 Some(signer) if authenticated => Some(signer),
469 _ => None,
470 };
471 let authenticated_caller_id = authenticated.then_some(caller_id);
472 self.push_application(ApplicationStatus {
473 caller_id: authenticated_caller_id,
474 id: callee_id,
475 description: application.description,
476 signer: authenticated_owner,
478 });
479 Ok(application.instance)
480 }
481
482 fn finish_call(&mut self) {
484 self.pop_application();
485 }
486
487 fn run_service_oracle_query(
489 &mut self,
490 application_id: ApplicationId,
491 query: Vec<u8>,
492 ) -> Result<Vec<u8>, ExecutionError> {
493 let timeout = self
494 .resource_controller
495 .remaining_service_oracle_execution_time()?;
496 let execution_start = Instant::now();
497 let deadline = Some(execution_start + timeout);
498 let response = self
499 .execution_state_sender
500 .send_request(|callback| ExecutionRequest::QueryServiceOracle {
501 deadline,
502 application_id,
503 next_block_height: self.height,
504 query,
505 callback,
506 })?
507 .recv_response()?;
508
509 self.resource_controller
510 .track_service_oracle_execution(execution_start.elapsed())?;
511 self.resource_controller
512 .track_service_oracle_response(response.len())?;
513
514 Ok(response)
515 }
516}
517
518impl SyncRuntimeInternal<UserServiceInstance> {
519 fn load_service_instance(
521 &mut self,
522 this: SyncRuntimeHandle<UserServiceInstance>,
523 id: ApplicationId,
524 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
525 match self.loaded_applications.entry(id) {
526 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
527
528 hash_map::Entry::Vacant(entry) => {
529 let (code, description) = match self.preloaded_applications.entry(id) {
532 #[cfg(web)]
534 hash_map::Entry::Vacant(_) => {
535 drop(this);
536 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
537 id,
538 )));
539 }
540 #[cfg(not(web))]
541 hash_map::Entry::Vacant(entry) => {
542 let (code, description) = self
543 .execution_state_sender
544 .send_request(move |callback| ExecutionRequest::LoadService {
545 id,
546 callback,
547 })?
548 .recv_response()?;
549 entry.insert((code, description)).clone()
550 }
551 hash_map::Entry::Occupied(entry) => entry.get().clone(),
552 };
553 let instance = code.instantiate(this)?;
554
555 self.applications_to_finalize.push(id);
556 Ok(entry
557 .insert(LoadedApplication::new(instance, description))
558 .clone())
559 }
560 }
561 }
562}
563
564impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
565 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
566 let handle = self.0.take().expect(
567 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
568 );
569 let runtime = Arc::into_inner(handle.0)?
570 .into_inner()
571 .expect("`SyncRuntime` should run in a single thread which should not panic");
572 Some(runtime)
573 }
574}
575
576impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
577 for SyncRuntimeHandle<UserInstance>
578{
579 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
580 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
581 }
582}
583
584impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
585 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
586 self.0
587 .try_lock()
588 .expect("Synchronous runtimes run on a single execution thread")
589 }
590}
591
592impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
593where
594 Self: ContractOrServiceRuntime,
595{
596 type Read = ();
597 type ReadValueBytes = u32;
598 type ContainsKey = u32;
599 type ContainsKeys = u32;
600 type ReadMultiValuesBytes = u32;
601 type FindKeysByPrefix = u32;
602 type FindKeyValuesByPrefix = u32;
603
604 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
605 let mut this = self.inner();
606 let chain_id = this.chain_id;
607 this.resource_controller.track_runtime_chain_id()?;
608 Ok(chain_id)
609 }
610
611 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
612 let mut this = self.inner();
613 let height = this.height;
614 this.resource_controller.track_runtime_block_height()?;
615 Ok(height)
616 }
617
618 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
619 let mut this = self.inner();
620 let application_id = this.current_application().id;
621 this.resource_controller.track_runtime_application_id()?;
622 Ok(application_id)
623 }
624
625 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
626 let mut this = self.inner();
627 let application_creator_chain_id = this.current_application().description.creator_chain_id;
628 this.resource_controller.track_runtime_application_id()?;
629 Ok(application_creator_chain_id)
630 }
631
632 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
633 let mut this = self.inner();
634 let parameters = this.current_application().description.parameters.clone();
635 this.resource_controller
636 .track_runtime_application_parameters(¶meters)?;
637 Ok(parameters)
638 }
639
640 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
641 let mut this = self.inner();
642 let timestamp = this
643 .execution_state_sender
644 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
645 .recv_response()?;
646 this.resource_controller.track_runtime_timestamp()?;
647 Ok(timestamp)
648 }
649
650 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
651 let mut this = self.inner();
652 let balance = this
653 .execution_state_sender
654 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
655 .recv_response()?;
656 this.resource_controller.track_runtime_balance()?;
657 Ok(balance)
658 }
659
660 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
661 let mut this = self.inner();
662 let balance = this
663 .execution_state_sender
664 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
665 .recv_response()?;
666 this.resource_controller.track_runtime_balance()?;
667 Ok(balance)
668 }
669
670 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
671 let mut this = self.inner();
672 let owner_balances = this
673 .execution_state_sender
674 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
675 .recv_response()?;
676 this.resource_controller
677 .track_runtime_owner_balances(&owner_balances)?;
678 Ok(owner_balances)
679 }
680
681 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
682 let mut this = self.inner();
683 let owners = this
684 .execution_state_sender
685 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
686 .recv_response()?;
687 this.resource_controller.track_runtime_owners(&owners)?;
688 Ok(owners)
689 }
690
691 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
692 let mut this = self.inner();
693 let chain_ownership = this
694 .execution_state_sender
695 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
696 .recv_response()?;
697 this.resource_controller
698 .track_runtime_chain_ownership(&chain_ownership)?;
699 Ok(chain_ownership)
700 }
701
702 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
703 let mut this = self.inner();
704 let id = this.current_application().id;
705 this.resource_controller.track_read_operation()?;
706 let receiver = this
707 .execution_state_sender
708 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
709 let state = this.view_user_states.entry(id).or_default();
710 state.contains_key_queries.register(receiver)
711 }
712
713 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
714 let mut this = self.inner();
715 let id = this.current_application().id;
716 let state = this.view_user_states.entry(id).or_default();
717 let value = state.contains_key_queries.wait(*promise)?;
718 Ok(value)
719 }
720
721 fn contains_keys_new(
722 &mut self,
723 keys: Vec<Vec<u8>>,
724 ) -> Result<Self::ContainsKeys, ExecutionError> {
725 let mut this = self.inner();
726 let id = this.current_application().id;
727 this.resource_controller.track_read_operation()?;
728 let receiver = this
729 .execution_state_sender
730 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
731 let state = this.view_user_states.entry(id).or_default();
732 state.contains_keys_queries.register(receiver)
733 }
734
735 fn contains_keys_wait(
736 &mut self,
737 promise: &Self::ContainsKeys,
738 ) -> Result<Vec<bool>, ExecutionError> {
739 let mut this = self.inner();
740 let id = this.current_application().id;
741 let state = this.view_user_states.entry(id).or_default();
742 let value = state.contains_keys_queries.wait(*promise)?;
743 Ok(value)
744 }
745
746 fn read_multi_values_bytes_new(
747 &mut self,
748 keys: Vec<Vec<u8>>,
749 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
750 let mut this = self.inner();
751 let id = this.current_application().id;
752 this.resource_controller.track_read_operation()?;
753 let receiver = this.execution_state_sender.send_request(move |callback| {
754 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
755 })?;
756 let state = this.view_user_states.entry(id).or_default();
757 state.read_multi_values_queries.register(receiver)
758 }
759
760 fn read_multi_values_bytes_wait(
761 &mut self,
762 promise: &Self::ReadMultiValuesBytes,
763 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
764 let mut this = self.inner();
765 let id = this.current_application().id;
766 let state = this.view_user_states.entry(id).or_default();
767 let values = state.read_multi_values_queries.wait(*promise)?;
768 for value in &values {
769 if let Some(value) = &value {
770 this.resource_controller
771 .track_bytes_read(value.len() as u64)?;
772 }
773 }
774 Ok(values)
775 }
776
777 fn read_value_bytes_new(
778 &mut self,
779 key: Vec<u8>,
780 ) -> Result<Self::ReadValueBytes, ExecutionError> {
781 let mut this = self.inner();
782 let id = this.current_application().id;
783 this.resource_controller.track_read_operation()?;
784 let receiver = this
785 .execution_state_sender
786 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
787 let state = this.view_user_states.entry(id).or_default();
788 state.read_value_queries.register(receiver)
789 }
790
791 fn read_value_bytes_wait(
792 &mut self,
793 promise: &Self::ReadValueBytes,
794 ) -> Result<Option<Vec<u8>>, ExecutionError> {
795 let mut this = self.inner();
796 let id = this.current_application().id;
797 let value = {
798 let state = this.view_user_states.entry(id).or_default();
799 state.read_value_queries.wait(*promise)?
800 };
801 if let Some(value) = &value {
802 this.resource_controller
803 .track_bytes_read(value.len() as u64)?;
804 }
805 Ok(value)
806 }
807
808 fn find_keys_by_prefix_new(
809 &mut self,
810 key_prefix: Vec<u8>,
811 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
812 let mut this = self.inner();
813 let id = this.current_application().id;
814 this.resource_controller.track_read_operation()?;
815 let receiver = this.execution_state_sender.send_request(move |callback| {
816 ExecutionRequest::FindKeysByPrefix {
817 id,
818 key_prefix,
819 callback,
820 }
821 })?;
822 let state = this.view_user_states.entry(id).or_default();
823 state.find_keys_queries.register(receiver)
824 }
825
826 fn find_keys_by_prefix_wait(
827 &mut self,
828 promise: &Self::FindKeysByPrefix,
829 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
830 let mut this = self.inner();
831 let id = this.current_application().id;
832 let keys = {
833 let state = this.view_user_states.entry(id).or_default();
834 state.find_keys_queries.wait(*promise)?
835 };
836 let mut read_size = 0;
837 for key in &keys {
838 read_size += key.len();
839 }
840 this.resource_controller
841 .track_bytes_read(read_size as u64)?;
842 Ok(keys)
843 }
844
845 fn find_key_values_by_prefix_new(
846 &mut self,
847 key_prefix: Vec<u8>,
848 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
849 let mut this = self.inner();
850 let id = this.current_application().id;
851 this.resource_controller.track_read_operation()?;
852 let receiver = this.execution_state_sender.send_request(move |callback| {
853 ExecutionRequest::FindKeyValuesByPrefix {
854 id,
855 key_prefix,
856 callback,
857 }
858 })?;
859 let state = this.view_user_states.entry(id).or_default();
860 state.find_key_values_queries.register(receiver)
861 }
862
863 fn find_key_values_by_prefix_wait(
864 &mut self,
865 promise: &Self::FindKeyValuesByPrefix,
866 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
867 let mut this = self.inner();
868 let id = this.current_application().id;
869 let state = this.view_user_states.entry(id).or_default();
870 let key_values = state.find_key_values_queries.wait(*promise)?;
871 let mut read_size = 0;
872 for (key, value) in &key_values {
873 read_size += key.len() + value.len();
874 }
875 this.resource_controller
876 .track_bytes_read(read_size as u64)?;
877 Ok(key_values)
878 }
879
880 fn perform_http_request(
881 &mut self,
882 request: http::Request,
883 ) -> Result<http::Response, ExecutionError> {
884 let mut this = self.inner();
885 let app_permissions = this
886 .execution_state_sender
887 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
888 .recv_response()?;
889
890 let app_id = this.current_application().id;
891 ensure!(
892 app_permissions.can_make_http_requests(&app_id),
893 ExecutionError::UnauthorizedApplication(app_id)
894 );
895
896 this.resource_controller.track_http_request()?;
897
898 this.execution_state_sender
899 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
900 request,
901 http_responses_are_oracle_responses:
902 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
903 callback,
904 })?
905 .recv_response()
906 }
907
908 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
909 let this = self.inner();
910 this.execution_state_sender
911 .send_request(|callback| ExecutionRequest::AssertBefore {
912 timestamp,
913 callback,
914 })?
915 .recv_response()?
916 }
917
918 fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
919 let this = self.inner();
920 let blob_id = hash.into();
921 let content = this
922 .execution_state_sender
923 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
924 .recv_response()?;
925 Ok(content.into_vec_or_clone())
926 }
927
928 fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
929 let this = self.inner();
930 let blob_id = hash.into();
931 this.execution_state_sender
932 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
933 .recv_response()
934 }
935
936 fn has_empty_storage(&mut self, application: ApplicationId) -> Result<bool, ExecutionError> {
937 let this = self.inner();
938 let (key_size, value_size) = this
939 .execution_state_sender
940 .send_request(move |callback| ExecutionRequest::TotalStorageSize {
941 application,
942 callback,
943 })?
944 .recv_response()?;
945 Ok(key_size + value_size == 0)
946 }
947
948 fn maximum_blob_size(&mut self) -> Result<u64, ExecutionError> {
949 Ok(self.inner().resource_controller.policy().maximum_blob_size)
950 }
951
952 fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
953 Ok(self.inner().allow_application_logs)
954 }
955}
956
957trait ContractOrServiceRuntime {
960 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
966}
967
968impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
969 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
970}
971
972impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
973 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
974}
975
976impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
977 fn clone(&self) -> Self {
978 SyncRuntimeHandle(self.0.clone())
979 }
980}
981
982impl ContractSyncRuntime {
983 pub(crate) fn new(
984 execution_state_sender: ExecutionStateSender,
985 chain_id: ChainId,
986 refund_grant_to: Option<Account>,
987 resource_controller: ResourceController,
988 action: &UserAction,
989 allow_application_logs: bool,
990 ) -> Self {
991 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
992 SyncRuntimeInternal::new(
993 chain_id,
994 action.height(),
995 action.round(),
996 if let UserAction::Message(context, _) = action {
997 Some(context.into())
998 } else {
999 None
1000 },
1001 execution_state_sender,
1002 None,
1003 refund_grant_to,
1004 resource_controller,
1005 action.timestamp(),
1006 allow_application_logs,
1007 ),
1008 )))
1009 }
1010
1011 pub(crate) fn preload_contract(
1013 &self,
1014 id: ApplicationId,
1015 code: UserContractCode,
1016 description: ApplicationDescription,
1017 ) -> Result<(), ExecutionError> {
1018 let this = self
1019 .0
1020 .as_ref()
1021 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1022 let mut this_guard = this.inner();
1023
1024 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1025 entry.insert((code, description));
1026 }
1027
1028 Ok(())
1029 }
1030
1031 pub(crate) fn run_action(
1033 mut self,
1034 application_id: ApplicationId,
1035 chain_id: ChainId,
1036 action: UserAction,
1037 ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1038 let result = self
1039 .deref_mut()
1040 .run_action(application_id, chain_id, action)?;
1041 let runtime = self
1042 .into_inner()
1043 .expect("Runtime clones should have been freed by now");
1044
1045 Ok((result, runtime.resource_controller))
1046 }
1047}
1048
1049impl ContractSyncRuntimeHandle {
1050 fn run_action(
1051 &mut self,
1052 application_id: ApplicationId,
1053 chain_id: ChainId,
1054 action: UserAction,
1055 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1056 let finalize_context = FinalizeContext {
1057 authenticated_owner: action.signer(),
1058 chain_id,
1059 height: action.height(),
1060 round: action.round(),
1061 };
1062
1063 {
1064 let runtime = self.inner();
1065 assert_eq!(runtime.chain_id, chain_id);
1066 assert_eq!(runtime.height, action.height());
1067 }
1068
1069 let signer = action.signer();
1070 let closure = move |code: &mut UserContractInstance| match action {
1071 UserAction::Instantiate(_context, argument) => {
1072 code.instantiate(argument).map(|()| None)
1073 }
1074 UserAction::Operation(_context, operation) => {
1075 code.execute_operation(operation).map(Option::Some)
1076 }
1077 UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1078 UserAction::ProcessStreams(_context, updates) => {
1079 code.process_streams(updates).map(|()| None)
1080 }
1081 };
1082
1083 let result = self.execute(application_id, signer, closure)?;
1084 self.finalize(finalize_context)?;
1085 Ok(result)
1086 }
1087
1088 fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1090 let applications = mem::take(&mut self.inner().applications_to_finalize)
1091 .into_iter()
1092 .rev();
1093
1094 self.inner().is_finalizing = true;
1095
1096 for application in applications {
1097 self.execute(application, context.authenticated_owner, |contract| {
1098 contract.finalize().map(|_| None)
1099 })?;
1100 self.inner().loaded_applications.remove(&application);
1101 }
1102
1103 Ok(())
1104 }
1105
1106 fn execute(
1108 &mut self,
1109 application_id: ApplicationId,
1110 signer: Option<AccountOwner>,
1111 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1112 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1113 let contract = {
1114 let mut runtime = self.inner();
1115 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1116
1117 let status = ApplicationStatus {
1118 caller_id: None,
1119 id: application_id,
1120 description: application.description.clone(),
1121 signer,
1122 };
1123
1124 runtime.push_application(status);
1125
1126 application
1127 };
1128
1129 let result = closure(
1130 &mut contract
1131 .instance
1132 .try_lock()
1133 .expect("Application should not be already executing"),
1134 )?;
1135
1136 let mut runtime = self.inner();
1137 let application_status = runtime.pop_application();
1138 assert_eq!(application_status.caller_id, None);
1139 assert_eq!(application_status.id, application_id);
1140 assert_eq!(application_status.description, contract.description);
1141 assert_eq!(application_status.signer, signer);
1142 assert!(runtime.call_stack.is_empty());
1143
1144 Ok(result)
1145 }
1146}
1147
1148impl ContractRuntime for ContractSyncRuntimeHandle {
1149 fn authenticated_owner(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1150 let this = self.inner();
1151 Ok(this.current_application().signer)
1152 }
1153
1154 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1155 Ok(self
1156 .inner()
1157 .executing_message
1158 .map(|metadata| metadata.is_bouncing))
1159 }
1160
1161 fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1162 Ok(self
1163 .inner()
1164 .executing_message
1165 .map(|metadata| metadata.origin))
1166 }
1167
1168 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1169 let this = self.inner();
1170 if this.call_stack.len() <= 1 {
1171 return Ok(None);
1172 }
1173 Ok(this.current_application().caller_id)
1174 }
1175
1176 fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1177 Ok(match vm_runtime {
1178 VmRuntime::Wasm => {
1179 self.inner()
1180 .resource_controller
1181 .policy()
1182 .maximum_wasm_fuel_per_block
1183 }
1184 VmRuntime::Evm => {
1185 self.inner()
1186 .resource_controller
1187 .policy()
1188 .maximum_evm_fuel_per_block
1189 }
1190 })
1191 }
1192
1193 fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1194 Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1195 }
1196
1197 fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1198 let mut this = self.inner();
1199 this.resource_controller.track_fuel(fuel, vm_runtime)
1200 }
1201
1202 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1203 let mut this = self.inner();
1204 let application = this.current_application();
1205 let application_id = application.id;
1206 let authenticated_owner = application.signer;
1207 let mut refund_grant_to = this.refund_grant_to;
1208
1209 let grant = this
1210 .resource_controller
1211 .policy()
1212 .total_price(&message.grant)?;
1213 if grant.is_zero() {
1214 refund_grant_to = None;
1215 } else {
1216 this.resource_controller.track_grant(grant)?;
1217 }
1218 let kind = if message.is_tracked {
1219 MessageKind::Tracked
1220 } else {
1221 MessageKind::Simple
1222 };
1223
1224 this.execution_state_sender
1225 .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1226 message: OutgoingMessage {
1227 destination: message.destination,
1228 authenticated_owner,
1229 refund_grant_to,
1230 grant,
1231 kind,
1232 message: Message::User {
1233 application_id,
1234 bytes: message.message,
1235 },
1236 },
1237 callback,
1238 })?
1239 .recv_response()?;
1240
1241 Ok(())
1242 }
1243
1244 fn transfer(
1245 &mut self,
1246 source: AccountOwner,
1247 destination: Account,
1248 amount: Amount,
1249 ) -> Result<(), ExecutionError> {
1250 let this = self.inner();
1251 let current_application = this.current_application();
1252 let application_id = current_application.id;
1253 let signer = current_application.signer;
1254
1255 this.execution_state_sender
1256 .send_request(|callback| ExecutionRequest::Transfer {
1257 source,
1258 destination,
1259 amount,
1260 signer,
1261 application_id,
1262 callback,
1263 })?
1264 .recv_response()?;
1265 Ok(())
1266 }
1267
1268 fn claim(
1269 &mut self,
1270 source: Account,
1271 destination: Account,
1272 amount: Amount,
1273 ) -> Result<(), ExecutionError> {
1274 let this = self.inner();
1275 let current_application = this.current_application();
1276 let application_id = current_application.id;
1277 let signer = current_application.signer;
1278
1279 this.execution_state_sender
1280 .send_request(|callback| ExecutionRequest::Claim {
1281 source,
1282 destination,
1283 amount,
1284 signer,
1285 application_id,
1286 callback,
1287 })?
1288 .recv_response()?;
1289 Ok(())
1290 }
1291
1292 fn try_call_application(
1293 &mut self,
1294 authenticated: bool,
1295 callee_id: ApplicationId,
1296 argument: Vec<u8>,
1297 ) -> Result<Vec<u8>, ExecutionError> {
1298 let contract = self
1299 .inner()
1300 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1301
1302 let value = contract
1303 .try_lock()
1304 .expect("Applications should not have reentrant calls")
1305 .execute_operation(argument)?;
1306
1307 self.inner().finish_call();
1308
1309 Ok(value)
1310 }
1311
1312 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1313 let mut this = self.inner();
1314 ensure!(
1315 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1316 ExecutionError::StreamNameTooLong
1317 );
1318 let application_id = GenericApplicationId::User(this.current_application().id);
1319 let stream_id = StreamId {
1320 stream_name,
1321 application_id,
1322 };
1323 let value_len = value.len() as u64;
1324 let index = this
1325 .execution_state_sender
1326 .send_request(|callback| ExecutionRequest::Emit {
1327 stream_id,
1328 value,
1329 callback,
1330 })?
1331 .recv_response()?;
1332 this.resource_controller.track_bytes_written(value_len)?;
1334 Ok(index)
1335 }
1336
1337 fn read_event(
1338 &mut self,
1339 chain_id: ChainId,
1340 stream_name: StreamName,
1341 index: u32,
1342 ) -> Result<Vec<u8>, ExecutionError> {
1343 let mut this = self.inner();
1344 ensure!(
1345 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1346 ExecutionError::StreamNameTooLong
1347 );
1348 let application_id = GenericApplicationId::User(this.current_application().id);
1349 let stream_id = StreamId {
1350 stream_name,
1351 application_id,
1352 };
1353 let event_id = EventId {
1354 stream_id,
1355 index,
1356 chain_id,
1357 };
1358 let event = this
1359 .execution_state_sender
1360 .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1361 .recv_response()?;
1362 this.resource_controller
1364 .track_bytes_read(event.len() as u64)?;
1365 Ok(event)
1366 }
1367
1368 fn subscribe_to_events(
1369 &mut self,
1370 chain_id: ChainId,
1371 application_id: ApplicationId,
1372 stream_name: StreamName,
1373 ) -> Result<(), ExecutionError> {
1374 let this = self.inner();
1375 ensure!(
1376 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1377 ExecutionError::StreamNameTooLong
1378 );
1379 let stream_id = StreamId {
1380 stream_name,
1381 application_id: application_id.into(),
1382 };
1383 let subscriber_app_id = this.current_application().id;
1384 this.execution_state_sender
1385 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1386 chain_id,
1387 stream_id,
1388 subscriber_app_id,
1389 callback,
1390 })?
1391 .recv_response()?;
1392 Ok(())
1393 }
1394
1395 fn unsubscribe_from_events(
1396 &mut self,
1397 chain_id: ChainId,
1398 application_id: ApplicationId,
1399 stream_name: StreamName,
1400 ) -> Result<(), ExecutionError> {
1401 let this = self.inner();
1402 ensure!(
1403 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1404 ExecutionError::StreamNameTooLong
1405 );
1406 let stream_id = StreamId {
1407 stream_name,
1408 application_id: application_id.into(),
1409 };
1410 let subscriber_app_id = this.current_application().id;
1411 this.execution_state_sender
1412 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1413 chain_id,
1414 stream_id,
1415 subscriber_app_id,
1416 callback,
1417 })?
1418 .recv_response()?;
1419 Ok(())
1420 }
1421
1422 fn query_service(
1423 &mut self,
1424 application_id: ApplicationId,
1425 query: Vec<u8>,
1426 ) -> Result<Vec<u8>, ExecutionError> {
1427 let mut this = self.inner();
1428
1429 let app_permissions = this
1430 .execution_state_sender
1431 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1432 .recv_response()?;
1433
1434 let app_id = this.current_application().id;
1435 ensure!(
1436 app_permissions.can_call_services(&app_id),
1437 ExecutionError::UnauthorizedApplication(app_id)
1438 );
1439
1440 this.resource_controller.track_service_oracle_call()?;
1441
1442 this.run_service_oracle_query(application_id, query)
1443 }
1444
1445 fn open_chain(
1446 &mut self,
1447 ownership: ChainOwnership,
1448 application_permissions: ApplicationPermissions,
1449 balance: Amount,
1450 ) -> Result<ChainId, ExecutionError> {
1451 let parent_id = self.inner().chain_id;
1452 let block_height = self.block_height()?;
1453
1454 let timestamp = self.inner().user_context;
1455
1456 let chain_id = self
1457 .inner()
1458 .execution_state_sender
1459 .send_request(|callback| ExecutionRequest::OpenChain {
1460 ownership,
1461 balance,
1462 parent_id,
1463 block_height,
1464 timestamp,
1465 application_permissions,
1466 callback,
1467 })?
1468 .recv_response()?;
1469
1470 Ok(chain_id)
1471 }
1472
1473 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1474 let this = self.inner();
1475 let application_id = this.current_application().id;
1476 this.execution_state_sender
1477 .send_request(|callback| ExecutionRequest::CloseChain {
1478 application_id,
1479 callback,
1480 })?
1481 .recv_response()?
1482 }
1483
1484 fn change_application_permissions(
1485 &mut self,
1486 application_permissions: ApplicationPermissions,
1487 ) -> Result<(), ExecutionError> {
1488 let this = self.inner();
1489 let application_id = this.current_application().id;
1490 this.execution_state_sender
1491 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1492 application_id,
1493 application_permissions,
1494 callback,
1495 })?
1496 .recv_response()?
1497 }
1498
1499 fn peek_application_index(&mut self) -> Result<u32, ExecutionError> {
1500 let index = self
1501 .inner()
1502 .execution_state_sender
1503 .send_request(move |callback| ExecutionRequest::PeekApplicationIndex { callback })?
1504 .recv_response()?;
1505 Ok(index)
1506 }
1507
1508 fn create_application(
1509 &mut self,
1510 module_id: ModuleId,
1511 parameters: Vec<u8>,
1512 argument: Vec<u8>,
1513 required_application_ids: Vec<ApplicationId>,
1514 ) -> Result<ApplicationId, ExecutionError> {
1515 let chain_id = self.inner().chain_id;
1516 let block_height = self.block_height()?;
1517
1518 let CreateApplicationResult { app_id } = self
1519 .inner()
1520 .execution_state_sender
1521 .send_request(move |callback| ExecutionRequest::CreateApplication {
1522 chain_id,
1523 block_height,
1524 module_id,
1525 parameters,
1526 required_application_ids,
1527 callback,
1528 })?
1529 .recv_response()?;
1530
1531 let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1532
1533 contract
1534 .try_lock()
1535 .expect("Applications should not have reentrant calls")
1536 .instantiate(argument)?;
1537
1538 self.inner().finish_call();
1539
1540 Ok(app_id)
1541 }
1542
1543 fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1544 let blob = Blob::new_data(bytes);
1545 let blob_id = blob.id();
1546 let this = self.inner();
1547 this.execution_state_sender
1548 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1549 .recv_response()?;
1550 Ok(DataBlobHash(blob_id.hash))
1551 }
1552
1553 fn publish_module(
1554 &mut self,
1555 contract: Bytecode,
1556 service: Bytecode,
1557 vm_runtime: VmRuntime,
1558 ) -> Result<ModuleId, ExecutionError> {
1559 let (blobs, module_id) =
1560 crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1561 let this = self.inner();
1562 for blob in blobs {
1563 this.execution_state_sender
1564 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1565 .recv_response()?;
1566 }
1567 Ok(module_id)
1568 }
1569
1570 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1571 let this = self.inner();
1572 let round = this.round;
1573 this.execution_state_sender
1574 .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1575 .recv_response()
1576 }
1577
1578 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1579 let mut this = self.inner();
1580 let id = this.current_application().id;
1581 let state = this.view_user_states.entry(id).or_default();
1582 state.force_all_pending_queries()?;
1583 this.resource_controller.track_write_operations(
1584 batch
1585 .num_operations()
1586 .try_into()
1587 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1588 )?;
1589 this.resource_controller
1590 .track_bytes_written(batch.size() as u64)?;
1591 this.execution_state_sender
1592 .send_request(|callback| ExecutionRequest::WriteBatch {
1593 id,
1594 batch,
1595 callback,
1596 })?
1597 .recv_response()?;
1598 Ok(())
1599 }
1600}
1601
1602impl ServiceSyncRuntime {
1603 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1605 Self::new_with_deadline(execution_state_sender, context, None)
1606 }
1607
1608 pub fn new_with_deadline(
1610 execution_state_sender: ExecutionStateSender,
1611 context: QueryContext,
1612 deadline: Option<Instant>,
1613 ) -> Self {
1614 let allow_application_logs = execution_state_sender
1616 .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1617 .ok()
1618 .and_then(|receiver| receiver.recv_response().ok())
1619 .unwrap_or(false);
1620
1621 let runtime = SyncRuntime(Some(
1622 SyncRuntimeInternal::new(
1623 context.chain_id,
1624 context.next_block_height,
1625 None,
1626 None,
1627 execution_state_sender,
1628 deadline,
1629 None,
1630 ResourceController::default(),
1631 (),
1632 allow_application_logs,
1633 )
1634 .into(),
1635 ));
1636
1637 ServiceSyncRuntime {
1638 runtime,
1639 current_context: context,
1640 }
1641 }
1642
1643 pub(crate) fn preload_service(
1645 &self,
1646 id: ApplicationId,
1647 code: UserServiceCode,
1648 description: ApplicationDescription,
1649 ) -> Result<(), ExecutionError> {
1650 let this = self
1651 .runtime
1652 .0
1653 .as_ref()
1654 .expect("services shouldn't be preloaded while the runtime is being dropped");
1655 let mut this_guard = this.inner();
1656
1657 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1658 entry.insert((code, description));
1659 }
1660
1661 Ok(())
1662 }
1663
1664 pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1666 while let Ok(request) = incoming_requests.recv() {
1667 let ServiceRuntimeRequest::Query {
1668 application_id,
1669 context,
1670 query,
1671 callback,
1672 } = request;
1673
1674 let result = self
1675 .prepare_for_query(context)
1676 .and_then(|()| self.run_query(application_id, query));
1677
1678 if let Err(err) = callback.send(result) {
1679 tracing::debug!(%err, "Receiver for query result has been dropped");
1680 }
1681 }
1682 }
1683
1684 pub(crate) fn prepare_for_query(
1686 &mut self,
1687 new_context: QueryContext,
1688 ) -> Result<(), ExecutionError> {
1689 let expected_context = QueryContext {
1690 local_time: new_context.local_time,
1691 ..self.current_context
1692 };
1693
1694 if new_context != expected_context {
1695 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1696 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1697 } else {
1698 self.handle_mut()
1699 .inner()
1700 .execution_state_sender
1701 .send_request(|callback| ExecutionRequest::SetLocalTime {
1702 local_time: new_context.local_time,
1703 callback,
1704 })?
1705 .recv_response()?;
1706 }
1707 Ok(())
1708 }
1709
1710 pub(crate) fn run_query(
1712 &mut self,
1713 application_id: ApplicationId,
1714 query: Vec<u8>,
1715 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1716 let this = self.handle_mut();
1717 let response = this.try_query_application(application_id, query)?;
1718 let operations = mem::take(&mut this.inner().scheduled_operations);
1719
1720 Ok(QueryOutcome {
1721 response,
1722 operations,
1723 })
1724 }
1725
1726 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1728 self.runtime.0.as_mut().expect(
1729 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1730 )
1731 }
1732}
1733
1734impl ServiceRuntime for ServiceSyncRuntimeHandle {
1735 fn try_query_application(
1737 &mut self,
1738 queried_id: ApplicationId,
1739 argument: Vec<u8>,
1740 ) -> Result<Vec<u8>, ExecutionError> {
1741 let service = {
1742 let mut this = self.inner();
1743
1744 let application = this.load_service_instance(self.clone(), queried_id)?;
1746 this.push_application(ApplicationStatus {
1748 caller_id: None,
1749 id: queried_id,
1750 description: application.description,
1751 signer: None,
1752 });
1753 application.instance
1754 };
1755 let response = service
1756 .try_lock()
1757 .expect("Applications should not have reentrant calls")
1758 .handle_query(argument)?;
1759 self.inner().pop_application();
1760 Ok(response)
1761 }
1762
1763 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1764 let mut this = self.inner();
1765 let application_id = this.current_application().id;
1766
1767 this.scheduled_operations.push(Operation::User {
1768 application_id,
1769 bytes: operation,
1770 });
1771
1772 Ok(())
1773 }
1774
1775 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1776 if let Some(deadline) = self.inner().deadline {
1777 if Instant::now() >= deadline {
1778 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1779 }
1780 }
1781 Ok(())
1782 }
1783}
1784
1785pub enum ServiceRuntimeRequest {
1787 Query {
1788 application_id: ApplicationId,
1789 context: QueryContext,
1790 query: Vec<u8>,
1791 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1792 },
1793}
1794
1795#[derive(Clone, Copy, Debug)]
1797struct ExecutingMessage {
1798 is_bouncing: bool,
1799 origin: ChainId,
1800}
1801
1802impl From<&MessageContext> for ExecutingMessage {
1803 fn from(context: &MessageContext) -> Self {
1804 ExecutingMessage {
1805 is_bouncing: context.is_bouncing,
1806 origin: context.origin,
1807 }
1808 }
1809}
1810
1811pub fn create_bytecode_blobs_sync(
1813 contract: Bytecode,
1814 service: Bytecode,
1815 vm_runtime: VmRuntime,
1816) -> (Vec<Blob>, ModuleId) {
1817 match vm_runtime {
1818 VmRuntime::Wasm => {
1819 let compressed_contract = contract.compress();
1820 let compressed_service = service.compress();
1821 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1822 let service_blob = Blob::new_service_bytecode(compressed_service);
1823 let module_id =
1824 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1825 (vec![contract_blob, service_blob], module_id)
1826 }
1827 VmRuntime::Evm => {
1828 let compressed_contract = contract.compress();
1829 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1830 let module_id = ModuleId::new(
1831 evm_contract_blob.id().hash,
1832 evm_contract_blob.id().hash,
1833 vm_runtime,
1834 );
1835 (vec![evm_contract_blob], module_id)
1836 }
1837 }
1838}