1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15 SendMessageRequest, Timestamp,
16 },
17 ensure, http,
18 identifiers::{
19 Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20 },
21 ownership::ChainOwnership,
22 time::Instant,
23 vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27
28use crate::{
29 execution::UserAction,
30 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
31 resources::ResourceController,
32 system::CreateApplicationResult,
33 util::{ReceiverExt, UnboundedSenderExt},
34 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
35 ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
36 OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
37 UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
38};
39
40#[cfg(test)]
41#[path = "unit_tests/runtime_tests.rs"]
42mod tests;
43
44pub trait WithContext {
45 type UserContext;
46 type Code;
47}
48
49impl WithContext for UserContractInstance {
50 type UserContext = Timestamp;
51 type Code = UserContractCode;
52}
53
54impl WithContext for UserServiceInstance {
55 type UserContext = ();
56 type Code = UserServiceCode;
57}
58
59#[cfg(test)]
60impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
61 type UserContext = ();
62 type Code = ();
63}
64
65#[derive(Debug)]
66pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
67
68pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
69
70pub struct ServiceSyncRuntime {
71 runtime: SyncRuntime<UserServiceInstance>,
72 current_context: QueryContext,
73}
74
75#[derive(Debug)]
76pub struct SyncRuntimeHandle<UserInstance: WithContext>(
77 Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
78);
79
80pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
81pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
82
83#[derive(Debug)]
85pub struct SyncRuntimeInternal<UserInstance: WithContext> {
86 chain_id: ChainId,
88 height: BlockHeight,
91 round: Option<u32>,
93 #[debug(skip_if = Option::is_none)]
95 executing_message: Option<ExecutingMessage>,
96
97 execution_state_sender: ExecutionStateSender,
99
100 is_finalizing: bool,
104 applications_to_finalize: Vec<ApplicationId>,
106
107 preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
109 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
111 call_stack: Vec<ApplicationStatus>,
113 active_applications: HashSet<ApplicationId>,
115 scheduled_operations: Vec<Operation>,
117
118 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
120
121 deadline: Option<Instant>,
125
126 #[debug(skip_if = Option::is_none)]
128 refund_grant_to: Option<Account>,
129 resource_controller: ResourceController,
131 user_context: UserInstance::UserContext,
133 allow_application_logs: bool,
135}
136
137#[derive(Debug)]
139struct ApplicationStatus {
140 caller_id: Option<ApplicationId>,
142 id: ApplicationId,
144 description: ApplicationDescription,
146 signer: Option<AccountOwner>,
148}
149
150#[derive(Debug)]
152struct LoadedApplication<Instance> {
153 instance: Arc<Mutex<Instance>>,
154 description: ApplicationDescription,
155}
156
157impl<Instance> LoadedApplication<Instance> {
158 fn new(instance: Instance, description: ApplicationDescription) -> Self {
160 LoadedApplication {
161 instance: Arc::new(Mutex::new(instance)),
162 description,
163 }
164 }
165}
166
167impl<Instance> Clone for LoadedApplication<Instance> {
168 fn clone(&self) -> Self {
171 LoadedApplication {
172 instance: self.instance.clone(),
173 description: self.description.clone(),
174 }
175 }
176}
177
178#[derive(Debug)]
179enum Promise<T> {
180 Ready(T),
181 Pending(Receiver<T>),
182}
183
184impl<T> Promise<T> {
185 fn force(&mut self) -> Result<(), ExecutionError> {
186 if let Promise::Pending(receiver) = self {
187 let value = receiver
188 .recv_ref()
189 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
190 *self = Promise::Ready(value);
191 }
192 Ok(())
193 }
194
195 fn read(self) -> Result<T, ExecutionError> {
196 match self {
197 Promise::Pending(receiver) => {
198 let value = receiver.recv_response()?;
199 Ok(value)
200 }
201 Promise::Ready(value) => Ok(value),
202 }
203 }
204}
205
206#[derive(Debug, Default)]
208struct QueryManager<T> {
209 pending_queries: BTreeMap<u32, Promise<T>>,
211 query_count: u32,
213 active_query_count: u32,
215}
216
217impl<T> QueryManager<T> {
218 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
219 let id = self.query_count;
220 self.pending_queries.insert(id, Promise::Pending(receiver));
221 self.query_count = self
222 .query_count
223 .checked_add(1)
224 .ok_or(ArithmeticError::Overflow)?;
225 self.active_query_count = self
226 .active_query_count
227 .checked_add(1)
228 .ok_or(ArithmeticError::Overflow)?;
229 Ok(id)
230 }
231
232 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
233 let promise = self
234 .pending_queries
235 .remove(&id)
236 .ok_or(ExecutionError::InvalidPromise)?;
237 let value = promise.read()?;
238 self.active_query_count -= 1;
239 Ok(value)
240 }
241
242 fn force_all(&mut self) -> Result<(), ExecutionError> {
243 for promise in self.pending_queries.values_mut() {
244 promise.force()?;
245 }
246 Ok(())
247 }
248}
249
250type Keys = Vec<Vec<u8>>;
251type Value = Vec<u8>;
252type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
253
254#[derive(Debug, Default)]
255struct ViewUserState {
256 contains_key_queries: QueryManager<bool>,
258 contains_keys_queries: QueryManager<Vec<bool>>,
260 read_value_queries: QueryManager<Option<Value>>,
262 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
264 find_keys_queries: QueryManager<Keys>,
266 find_key_values_queries: QueryManager<KeyValues>,
268}
269
270impl ViewUserState {
271 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
272 self.contains_key_queries.force_all()?;
273 self.contains_keys_queries.force_all()?;
274 self.read_value_queries.force_all()?;
275 self.read_multi_values_queries.force_all()?;
276 self.find_keys_queries.force_all()?;
277 self.find_key_values_queries.force_all()?;
278 Ok(())
279 }
280}
281
282impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
283 type Target = SyncRuntimeHandle<UserInstance>;
284
285 fn deref(&self) -> &Self::Target {
286 self.0.as_ref().expect(
287 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
288 )
289 }
290}
291
292impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
293 fn deref_mut(&mut self) -> &mut Self::Target {
294 self.0.as_mut().expect(
295 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
296 )
297 }
298}
299
300impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
301 fn drop(&mut self) {
302 if let Some(handle) = self.0.take() {
305 handle.inner().loaded_applications.clear();
306 }
307 }
308}
309
310impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
311 #[expect(clippy::too_many_arguments)]
312 fn new(
313 chain_id: ChainId,
314 height: BlockHeight,
315 round: Option<u32>,
316 executing_message: Option<ExecutingMessage>,
317 execution_state_sender: ExecutionStateSender,
318 deadline: Option<Instant>,
319 refund_grant_to: Option<Account>,
320 resource_controller: ResourceController,
321 user_context: UserInstance::UserContext,
322 allow_application_logs: bool,
323 ) -> Self {
324 Self {
325 chain_id,
326 height,
327 round,
328 executing_message,
329 execution_state_sender,
330 is_finalizing: false,
331 applications_to_finalize: Vec::new(),
332 preloaded_applications: HashMap::new(),
333 loaded_applications: HashMap::new(),
334 call_stack: Vec::new(),
335 active_applications: HashSet::new(),
336 view_user_states: BTreeMap::new(),
337 deadline,
338 refund_grant_to,
339 resource_controller,
340 scheduled_operations: Vec::new(),
341 user_context,
342 allow_application_logs,
343 }
344 }
345
346 fn current_application(&self) -> &ApplicationStatus {
354 self.call_stack
355 .last()
356 .expect("Call stack is unexpectedly empty")
357 }
358
359 fn push_application(&mut self, status: ApplicationStatus) {
363 self.active_applications.insert(status.id);
364 self.call_stack.push(status);
365 }
366
367 fn pop_application(&mut self) -> ApplicationStatus {
375 let status = self
376 .call_stack
377 .pop()
378 .expect("Can't remove application from empty call stack");
379 assert!(self.active_applications.remove(&status.id));
380 status
381 }
382
383 fn check_for_reentrancy(
387 &mut self,
388 application_id: ApplicationId,
389 ) -> Result<(), ExecutionError> {
390 ensure!(
391 !self.active_applications.contains(&application_id),
392 ExecutionError::ReentrantCall(application_id)
393 );
394 Ok(())
395 }
396}
397
398impl SyncRuntimeInternal<UserContractInstance> {
399 fn load_contract_instance(
401 &mut self,
402 this: SyncRuntimeHandle<UserContractInstance>,
403 id: ApplicationId,
404 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
405 match self.loaded_applications.entry(id) {
406 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
407
408 hash_map::Entry::Vacant(entry) => {
409 let (code, description) = match self.preloaded_applications.entry(id) {
412 #[cfg(web)]
414 hash_map::Entry::Vacant(_) => {
415 drop(this);
416 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
417 id,
418 )));
419 }
420 #[cfg(not(web))]
421 hash_map::Entry::Vacant(entry) => {
422 let (code, description) = self
423 .execution_state_sender
424 .send_request(move |callback| ExecutionRequest::LoadContract {
425 id,
426 callback,
427 })?
428 .recv_response()?;
429 entry.insert((code, description)).clone()
430 }
431 hash_map::Entry::Occupied(entry) => entry.get().clone(),
432 };
433 let instance = code.instantiate(this)?;
434
435 self.applications_to_finalize.push(id);
436 Ok(entry
437 .insert(LoadedApplication::new(instance, description))
438 .clone())
439 }
440 }
441 }
442
443 fn prepare_for_call(
445 &mut self,
446 this: ContractSyncRuntimeHandle,
447 authenticated: bool,
448 callee_id: ApplicationId,
449 ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
450 self.check_for_reentrancy(callee_id)?;
451
452 ensure!(
453 !self.is_finalizing,
454 ExecutionError::CrossApplicationCallInFinalize {
455 caller_id: Box::new(self.current_application().id),
456 callee_id: Box::new(callee_id),
457 }
458 );
459
460 let application = self.load_contract_instance(this, callee_id)?;
462
463 let caller = self.current_application();
464 let caller_id = caller.id;
465 let caller_signer = caller.signer;
466 let authenticated_owner = match caller_signer {
468 Some(signer) if authenticated => Some(signer),
469 _ => None,
470 };
471 let authenticated_caller_id = authenticated.then_some(caller_id);
472 self.push_application(ApplicationStatus {
473 caller_id: authenticated_caller_id,
474 id: callee_id,
475 description: application.description,
476 signer: authenticated_owner,
478 });
479 Ok(application.instance)
480 }
481
482 fn finish_call(&mut self) {
484 self.pop_application();
485 }
486
487 fn run_service_oracle_query(
489 &mut self,
490 application_id: ApplicationId,
491 query: Vec<u8>,
492 ) -> Result<Vec<u8>, ExecutionError> {
493 let timeout = self
494 .resource_controller
495 .remaining_service_oracle_execution_time()?;
496 let execution_start = Instant::now();
497 let deadline = Some(execution_start + timeout);
498 let response = self
499 .execution_state_sender
500 .send_request(|callback| ExecutionRequest::QueryServiceOracle {
501 deadline,
502 application_id,
503 next_block_height: self.height,
504 query,
505 callback,
506 })?
507 .recv_response()?;
508
509 self.resource_controller
510 .track_service_oracle_execution(execution_start.elapsed())?;
511 self.resource_controller
512 .track_service_oracle_response(response.len())?;
513
514 Ok(response)
515 }
516}
517
518impl SyncRuntimeInternal<UserServiceInstance> {
519 fn load_service_instance(
521 &mut self,
522 this: SyncRuntimeHandle<UserServiceInstance>,
523 id: ApplicationId,
524 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
525 match self.loaded_applications.entry(id) {
526 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
527
528 hash_map::Entry::Vacant(entry) => {
529 let (code, description) = match self.preloaded_applications.entry(id) {
532 #[cfg(web)]
534 hash_map::Entry::Vacant(_) => {
535 drop(this);
536 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
537 id,
538 )));
539 }
540 #[cfg(not(web))]
541 hash_map::Entry::Vacant(entry) => {
542 let (code, description) = self
543 .execution_state_sender
544 .send_request(move |callback| ExecutionRequest::LoadService {
545 id,
546 callback,
547 })?
548 .recv_response()?;
549 entry.insert((code, description)).clone()
550 }
551 hash_map::Entry::Occupied(entry) => entry.get().clone(),
552 };
553 let instance = code.instantiate(this)?;
554
555 self.applications_to_finalize.push(id);
556 Ok(entry
557 .insert(LoadedApplication::new(instance, description))
558 .clone())
559 }
560 }
561 }
562}
563
564impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
565 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
566 let handle = self.0.take().expect(
567 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
568 );
569 let runtime = Arc::into_inner(handle.0)?
570 .into_inner()
571 .expect("`SyncRuntime` should run in a single thread which should not panic");
572 Some(runtime)
573 }
574}
575
576impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
577 for SyncRuntimeHandle<UserInstance>
578{
579 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
580 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
581 }
582}
583
584impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
585 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
586 self.0
587 .try_lock()
588 .expect("Synchronous runtimes run on a single execution thread")
589 }
590}
591
592impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
593where
594 Self: ContractOrServiceRuntime,
595{
596 type Read = ();
597 type ReadValueBytes = u32;
598 type ContainsKey = u32;
599 type ContainsKeys = u32;
600 type ReadMultiValuesBytes = u32;
601 type FindKeysByPrefix = u32;
602 type FindKeyValuesByPrefix = u32;
603
604 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
605 let mut this = self.inner();
606 let chain_id = this.chain_id;
607 this.resource_controller.track_runtime_chain_id()?;
608 Ok(chain_id)
609 }
610
611 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
612 let mut this = self.inner();
613 let height = this.height;
614 this.resource_controller.track_runtime_block_height()?;
615 Ok(height)
616 }
617
618 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
619 let mut this = self.inner();
620 let application_id = this.current_application().id;
621 this.resource_controller.track_runtime_application_id()?;
622 Ok(application_id)
623 }
624
625 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
626 let mut this = self.inner();
627 let application_creator_chain_id = this.current_application().description.creator_chain_id;
628 this.resource_controller.track_runtime_application_id()?;
629 Ok(application_creator_chain_id)
630 }
631
632 fn read_application_description(
633 &mut self,
634 application_id: ApplicationId,
635 ) -> Result<ApplicationDescription, ExecutionError> {
636 let mut this = self.inner();
637 let description = this
638 .execution_state_sender
639 .send_request(|callback| ExecutionRequest::ReadApplicationDescription {
640 application_id,
641 callback,
642 })?
643 .recv_response()?;
644 this.resource_controller
645 .track_runtime_application_description(&description)?;
646 Ok(description)
647 }
648
649 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
650 let mut this = self.inner();
651 let parameters = this.current_application().description.parameters.clone();
652 this.resource_controller
653 .track_runtime_application_parameters(¶meters)?;
654 Ok(parameters)
655 }
656
657 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
658 let mut this = self.inner();
659 let timestamp = this
660 .execution_state_sender
661 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
662 .recv_response()?;
663 this.resource_controller.track_runtime_timestamp()?;
664 Ok(timestamp)
665 }
666
667 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
668 let mut this = self.inner();
669 let balance = this
670 .execution_state_sender
671 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
672 .recv_response()?;
673 this.resource_controller.track_runtime_balance()?;
674 Ok(balance)
675 }
676
677 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
678 let mut this = self.inner();
679 let balance = this
680 .execution_state_sender
681 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
682 .recv_response()?;
683 this.resource_controller.track_runtime_balance()?;
684 Ok(balance)
685 }
686
687 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
688 let mut this = self.inner();
689 let owner_balances = this
690 .execution_state_sender
691 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
692 .recv_response()?;
693 this.resource_controller
694 .track_runtime_owner_balances(&owner_balances)?;
695 Ok(owner_balances)
696 }
697
698 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
699 let mut this = self.inner();
700 let owners = this
701 .execution_state_sender
702 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
703 .recv_response()?;
704 this.resource_controller.track_runtime_owners(&owners)?;
705 Ok(owners)
706 }
707
708 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
709 let mut this = self.inner();
710 let chain_ownership = this
711 .execution_state_sender
712 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
713 .recv_response()?;
714 this.resource_controller
715 .track_runtime_chain_ownership(&chain_ownership)?;
716 Ok(chain_ownership)
717 }
718
719 fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
720 let this = self.inner();
721 let application_permissions = this
722 .execution_state_sender
723 .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
724 .recv_response()?;
725 Ok(application_permissions)
726 }
727
728 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
729 let mut this = self.inner();
730 let id = this.current_application().id;
731 this.resource_controller.track_read_operation()?;
732 let receiver = this
733 .execution_state_sender
734 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
735 let state = this.view_user_states.entry(id).or_default();
736 state.contains_key_queries.register(receiver)
737 }
738
739 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
740 let mut this = self.inner();
741 let id = this.current_application().id;
742 let state = this.view_user_states.entry(id).or_default();
743 let value = state.contains_key_queries.wait(*promise)?;
744 Ok(value)
745 }
746
747 fn contains_keys_new(
748 &mut self,
749 keys: Vec<Vec<u8>>,
750 ) -> Result<Self::ContainsKeys, ExecutionError> {
751 let mut this = self.inner();
752 let id = this.current_application().id;
753 this.resource_controller.track_read_operation()?;
754 let receiver = this
755 .execution_state_sender
756 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
757 let state = this.view_user_states.entry(id).or_default();
758 state.contains_keys_queries.register(receiver)
759 }
760
761 fn contains_keys_wait(
762 &mut self,
763 promise: &Self::ContainsKeys,
764 ) -> Result<Vec<bool>, ExecutionError> {
765 let mut this = self.inner();
766 let id = this.current_application().id;
767 let state = this.view_user_states.entry(id).or_default();
768 let value = state.contains_keys_queries.wait(*promise)?;
769 Ok(value)
770 }
771
772 fn read_multi_values_bytes_new(
773 &mut self,
774 keys: Vec<Vec<u8>>,
775 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
776 let mut this = self.inner();
777 let id = this.current_application().id;
778 this.resource_controller.track_read_operation()?;
779 let receiver = this.execution_state_sender.send_request(move |callback| {
780 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
781 })?;
782 let state = this.view_user_states.entry(id).or_default();
783 state.read_multi_values_queries.register(receiver)
784 }
785
786 fn read_multi_values_bytes_wait(
787 &mut self,
788 promise: &Self::ReadMultiValuesBytes,
789 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
790 let mut this = self.inner();
791 let id = this.current_application().id;
792 let state = this.view_user_states.entry(id).or_default();
793 let values = state.read_multi_values_queries.wait(*promise)?;
794 for value in &values {
795 if let Some(value) = &value {
796 this.resource_controller
797 .track_bytes_read(value.len() as u64)?;
798 }
799 }
800 Ok(values)
801 }
802
803 fn read_value_bytes_new(
804 &mut self,
805 key: Vec<u8>,
806 ) -> Result<Self::ReadValueBytes, ExecutionError> {
807 let mut this = self.inner();
808 let id = this.current_application().id;
809 this.resource_controller.track_read_operation()?;
810 let receiver = this
811 .execution_state_sender
812 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
813 let state = this.view_user_states.entry(id).or_default();
814 state.read_value_queries.register(receiver)
815 }
816
817 fn read_value_bytes_wait(
818 &mut self,
819 promise: &Self::ReadValueBytes,
820 ) -> Result<Option<Vec<u8>>, ExecutionError> {
821 let mut this = self.inner();
822 let id = this.current_application().id;
823 let value = {
824 let state = this.view_user_states.entry(id).or_default();
825 state.read_value_queries.wait(*promise)?
826 };
827 if let Some(value) = &value {
828 this.resource_controller
829 .track_bytes_read(value.len() as u64)?;
830 }
831 Ok(value)
832 }
833
834 fn find_keys_by_prefix_new(
835 &mut self,
836 key_prefix: Vec<u8>,
837 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
838 let mut this = self.inner();
839 let id = this.current_application().id;
840 this.resource_controller.track_read_operation()?;
841 let receiver = this.execution_state_sender.send_request(move |callback| {
842 ExecutionRequest::FindKeysByPrefix {
843 id,
844 key_prefix,
845 callback,
846 }
847 })?;
848 let state = this.view_user_states.entry(id).or_default();
849 state.find_keys_queries.register(receiver)
850 }
851
852 fn find_keys_by_prefix_wait(
853 &mut self,
854 promise: &Self::FindKeysByPrefix,
855 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
856 let mut this = self.inner();
857 let id = this.current_application().id;
858 let keys = {
859 let state = this.view_user_states.entry(id).or_default();
860 state.find_keys_queries.wait(*promise)?
861 };
862 let mut read_size = 0;
863 for key in &keys {
864 read_size += key.len();
865 }
866 this.resource_controller
867 .track_bytes_read(read_size as u64)?;
868 Ok(keys)
869 }
870
871 fn find_key_values_by_prefix_new(
872 &mut self,
873 key_prefix: Vec<u8>,
874 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
875 let mut this = self.inner();
876 let id = this.current_application().id;
877 this.resource_controller.track_read_operation()?;
878 let receiver = this.execution_state_sender.send_request(move |callback| {
879 ExecutionRequest::FindKeyValuesByPrefix {
880 id,
881 key_prefix,
882 callback,
883 }
884 })?;
885 let state = this.view_user_states.entry(id).or_default();
886 state.find_key_values_queries.register(receiver)
887 }
888
889 fn find_key_values_by_prefix_wait(
890 &mut self,
891 promise: &Self::FindKeyValuesByPrefix,
892 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
893 let mut this = self.inner();
894 let id = this.current_application().id;
895 let state = this.view_user_states.entry(id).or_default();
896 let key_values = state.find_key_values_queries.wait(*promise)?;
897 let mut read_size = 0;
898 for (key, value) in &key_values {
899 read_size += key.len() + value.len();
900 }
901 this.resource_controller
902 .track_bytes_read(read_size as u64)?;
903 Ok(key_values)
904 }
905
906 fn perform_http_request(
907 &mut self,
908 request: http::Request,
909 ) -> Result<http::Response, ExecutionError> {
910 let mut this = self.inner();
911 let app_permissions = this
912 .execution_state_sender
913 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
914 .recv_response()?;
915
916 let app_id = this.current_application().id;
917 ensure!(
918 app_permissions.can_make_http_requests(&app_id),
919 ExecutionError::UnauthorizedApplication(app_id)
920 );
921
922 this.resource_controller.track_http_request()?;
923
924 this.execution_state_sender
925 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
926 request,
927 http_responses_are_oracle_responses:
928 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
929 callback,
930 })?
931 .recv_response()
932 }
933
934 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
935 let this = self.inner();
936 this.execution_state_sender
937 .send_request(|callback| ExecutionRequest::AssertBefore {
938 timestamp,
939 callback,
940 })?
941 .recv_response()?
942 }
943
944 fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
945 let this = self.inner();
946 let blob_id = hash.into();
947 let content = this
948 .execution_state_sender
949 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
950 .recv_response()?;
951 Ok(content.into_vec_or_clone())
952 }
953
954 fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
955 let this = self.inner();
956 let blob_id = hash.into();
957 this.execution_state_sender
958 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
959 .recv_response()
960 }
961
962 fn has_empty_storage(&mut self, application: ApplicationId) -> Result<bool, ExecutionError> {
963 let this = self.inner();
964 let (key_size, value_size) = this
965 .execution_state_sender
966 .send_request(move |callback| ExecutionRequest::TotalStorageSize {
967 application,
968 callback,
969 })?
970 .recv_response()?;
971 Ok(key_size + value_size == 0)
972 }
973
974 fn maximum_blob_size(&mut self) -> Result<u64, ExecutionError> {
975 Ok(self.inner().resource_controller.policy().maximum_blob_size)
976 }
977
978 fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
979 Ok(self.inner().allow_application_logs)
980 }
981
982 #[cfg(web)]
983 fn send_log(&mut self, message: String, level: tracing::log::Level) {
984 let this = self.inner();
985 this.execution_state_sender
987 .unbounded_send(ExecutionRequest::Log { message, level })
988 .ok();
989 }
990}
991
992trait ContractOrServiceRuntime {
995 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
1001}
1002
1003impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
1004 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
1005}
1006
1007impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
1008 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
1009}
1010
1011impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
1012 fn clone(&self) -> Self {
1013 SyncRuntimeHandle(self.0.clone())
1014 }
1015}
1016
1017impl ContractSyncRuntime {
1018 pub(crate) fn new(
1019 execution_state_sender: ExecutionStateSender,
1020 chain_id: ChainId,
1021 refund_grant_to: Option<Account>,
1022 resource_controller: ResourceController,
1023 action: &UserAction,
1024 allow_application_logs: bool,
1025 ) -> Self {
1026 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1027 SyncRuntimeInternal::new(
1028 chain_id,
1029 action.height(),
1030 action.round(),
1031 if let UserAction::Message(context, _) = action {
1032 Some(context.into())
1033 } else {
1034 None
1035 },
1036 execution_state_sender,
1037 None,
1038 refund_grant_to,
1039 resource_controller,
1040 action.timestamp(),
1041 allow_application_logs,
1042 ),
1043 )))
1044 }
1045
1046 pub(crate) fn preload_contract(
1048 &self,
1049 id: ApplicationId,
1050 code: UserContractCode,
1051 description: ApplicationDescription,
1052 ) -> Result<(), ExecutionError> {
1053 let this = self
1054 .0
1055 .as_ref()
1056 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1057 let mut this_guard = this.inner();
1058
1059 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1060 entry.insert((code, description));
1061 }
1062
1063 Ok(())
1064 }
1065
1066 pub(crate) fn run_action(
1068 mut self,
1069 application_id: ApplicationId,
1070 chain_id: ChainId,
1071 action: UserAction,
1072 ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1073 let result = self
1074 .deref_mut()
1075 .run_action(application_id, chain_id, action)?;
1076 let runtime = self
1077 .into_inner()
1078 .expect("Runtime clones should have been freed by now");
1079
1080 Ok((result, runtime.resource_controller))
1081 }
1082}
1083
1084impl ContractSyncRuntimeHandle {
1085 fn run_action(
1086 &mut self,
1087 application_id: ApplicationId,
1088 chain_id: ChainId,
1089 action: UserAction,
1090 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1091 let finalize_context = FinalizeContext {
1092 authenticated_owner: action.signer(),
1093 chain_id,
1094 height: action.height(),
1095 round: action.round(),
1096 };
1097
1098 {
1099 let runtime = self.inner();
1100 assert_eq!(runtime.chain_id, chain_id);
1101 assert_eq!(runtime.height, action.height());
1102 }
1103
1104 let signer = action.signer();
1105 let closure = move |code: &mut UserContractInstance| match action {
1106 UserAction::Instantiate(_context, argument) => {
1107 code.instantiate(argument).map(|()| None)
1108 }
1109 UserAction::Operation(_context, operation) => {
1110 code.execute_operation(operation).map(Option::Some)
1111 }
1112 UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1113 UserAction::ProcessStreams(_context, updates) => {
1114 code.process_streams(updates).map(|()| None)
1115 }
1116 };
1117
1118 let result = self.execute(application_id, signer, closure)?;
1119 self.finalize(finalize_context)?;
1120 Ok(result)
1121 }
1122
1123 fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
1125 let applications = mem::take(&mut self.inner().applications_to_finalize)
1126 .into_iter()
1127 .rev();
1128
1129 self.inner().is_finalizing = true;
1130
1131 for application in applications {
1132 self.execute(application, context.authenticated_owner, |contract| {
1133 contract.finalize().map(|_| None)
1134 })?;
1135 self.inner().loaded_applications.remove(&application);
1136 }
1137
1138 Ok(())
1139 }
1140
1141 fn execute(
1143 &mut self,
1144 application_id: ApplicationId,
1145 signer: Option<AccountOwner>,
1146 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1147 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1148 let contract = {
1149 let mut runtime = self.inner();
1150 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1151
1152 let status = ApplicationStatus {
1153 caller_id: None,
1154 id: application_id,
1155 description: application.description.clone(),
1156 signer,
1157 };
1158
1159 runtime.push_application(status);
1160
1161 application
1162 };
1163
1164 let result = closure(
1165 &mut contract
1166 .instance
1167 .try_lock()
1168 .expect("Application should not be already executing"),
1169 )?;
1170
1171 let mut runtime = self.inner();
1172 let application_status = runtime.pop_application();
1173 assert_eq!(application_status.caller_id, None);
1174 assert_eq!(application_status.id, application_id);
1175 assert_eq!(application_status.description, contract.description);
1176 assert_eq!(application_status.signer, signer);
1177 assert!(runtime.call_stack.is_empty());
1178
1179 Ok(result)
1180 }
1181}
1182
1183impl ContractRuntime for ContractSyncRuntimeHandle {
1184 fn authenticated_owner(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1185 let this = self.inner();
1186 Ok(this.current_application().signer)
1187 }
1188
1189 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1190 Ok(self
1191 .inner()
1192 .executing_message
1193 .map(|metadata| metadata.is_bouncing))
1194 }
1195
1196 fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1197 Ok(self
1198 .inner()
1199 .executing_message
1200 .map(|metadata| metadata.origin))
1201 }
1202
1203 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1204 let this = self.inner();
1205 if this.call_stack.len() <= 1 {
1206 return Ok(None);
1207 }
1208 Ok(this.current_application().caller_id)
1209 }
1210
1211 fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1212 Ok(match vm_runtime {
1213 VmRuntime::Wasm => {
1214 self.inner()
1215 .resource_controller
1216 .policy()
1217 .maximum_wasm_fuel_per_block
1218 }
1219 VmRuntime::Evm => {
1220 self.inner()
1221 .resource_controller
1222 .policy()
1223 .maximum_evm_fuel_per_block
1224 }
1225 })
1226 }
1227
1228 fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1229 Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1230 }
1231
1232 fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1233 let mut this = self.inner();
1234 this.resource_controller.track_fuel(fuel, vm_runtime)
1235 }
1236
1237 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1238 let mut this = self.inner();
1239 let application = this.current_application();
1240 let application_id = application.id;
1241 let authenticated_owner = application.signer;
1242 let mut refund_grant_to = this.refund_grant_to;
1243
1244 let grant = this
1245 .resource_controller
1246 .policy()
1247 .total_price(&message.grant)?;
1248 if grant.is_zero() {
1249 refund_grant_to = None;
1250 } else {
1251 this.resource_controller.track_grant(grant)?;
1252 }
1253 let kind = if message.is_tracked {
1254 MessageKind::Tracked
1255 } else {
1256 MessageKind::Simple
1257 };
1258
1259 this.execution_state_sender
1260 .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1261 message: OutgoingMessage {
1262 destination: message.destination,
1263 authenticated_owner,
1264 refund_grant_to,
1265 grant,
1266 kind,
1267 message: Message::User {
1268 application_id,
1269 bytes: message.message,
1270 },
1271 },
1272 callback,
1273 })?
1274 .recv_response()?;
1275
1276 Ok(())
1277 }
1278
1279 fn transfer(
1280 &mut self,
1281 source: AccountOwner,
1282 destination: Account,
1283 amount: Amount,
1284 ) -> Result<(), ExecutionError> {
1285 let this = self.inner();
1286 let current_application = this.current_application();
1287 let application_id = current_application.id;
1288 let signer = current_application.signer;
1289
1290 this.execution_state_sender
1291 .send_request(|callback| ExecutionRequest::Transfer {
1292 source,
1293 destination,
1294 amount,
1295 signer,
1296 application_id,
1297 callback,
1298 })?
1299 .recv_response()?;
1300 Ok(())
1301 }
1302
1303 fn claim(
1304 &mut self,
1305 source: Account,
1306 destination: Account,
1307 amount: Amount,
1308 ) -> Result<(), ExecutionError> {
1309 let this = self.inner();
1310 let current_application = this.current_application();
1311 let application_id = current_application.id;
1312 let signer = current_application.signer;
1313
1314 this.execution_state_sender
1315 .send_request(|callback| ExecutionRequest::Claim {
1316 source,
1317 destination,
1318 amount,
1319 signer,
1320 application_id,
1321 callback,
1322 })?
1323 .recv_response()?;
1324 Ok(())
1325 }
1326
1327 fn try_call_application(
1328 &mut self,
1329 authenticated: bool,
1330 callee_id: ApplicationId,
1331 argument: Vec<u8>,
1332 ) -> Result<Vec<u8>, ExecutionError> {
1333 let contract = self
1334 .inner()
1335 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1336
1337 let value = contract
1338 .try_lock()
1339 .expect("Applications should not have reentrant calls")
1340 .execute_operation(argument)?;
1341
1342 self.inner().finish_call();
1343
1344 Ok(value)
1345 }
1346
1347 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1348 let mut this = self.inner();
1349 ensure!(
1350 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1351 ExecutionError::StreamNameTooLong
1352 );
1353 let application_id = GenericApplicationId::User(this.current_application().id);
1354 let stream_id = StreamId {
1355 stream_name,
1356 application_id,
1357 };
1358 let value_len = value.len() as u64;
1359 let index = this
1360 .execution_state_sender
1361 .send_request(|callback| ExecutionRequest::Emit {
1362 stream_id,
1363 value,
1364 callback,
1365 })?
1366 .recv_response()?;
1367 this.resource_controller.track_bytes_written(value_len)?;
1369 Ok(index)
1370 }
1371
1372 fn read_event(
1373 &mut self,
1374 chain_id: ChainId,
1375 stream_name: StreamName,
1376 index: u32,
1377 ) -> Result<Vec<u8>, ExecutionError> {
1378 let mut this = self.inner();
1379 ensure!(
1380 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1381 ExecutionError::StreamNameTooLong
1382 );
1383 let application_id = GenericApplicationId::User(this.current_application().id);
1384 let stream_id = StreamId {
1385 stream_name,
1386 application_id,
1387 };
1388 let event_id = EventId {
1389 stream_id,
1390 index,
1391 chain_id,
1392 };
1393 let event = this
1394 .execution_state_sender
1395 .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1396 .recv_response()?;
1397 this.resource_controller
1399 .track_bytes_read(event.len() as u64)?;
1400 Ok(event)
1401 }
1402
1403 fn subscribe_to_events(
1404 &mut self,
1405 chain_id: ChainId,
1406 application_id: ApplicationId,
1407 stream_name: StreamName,
1408 ) -> Result<(), ExecutionError> {
1409 let this = self.inner();
1410 ensure!(
1411 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1412 ExecutionError::StreamNameTooLong
1413 );
1414 let stream_id = StreamId {
1415 stream_name,
1416 application_id: application_id.into(),
1417 };
1418 let subscriber_app_id = this.current_application().id;
1419 this.execution_state_sender
1420 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1421 chain_id,
1422 stream_id,
1423 subscriber_app_id,
1424 callback,
1425 })?
1426 .recv_response()?;
1427 Ok(())
1428 }
1429
1430 fn unsubscribe_from_events(
1431 &mut self,
1432 chain_id: ChainId,
1433 application_id: ApplicationId,
1434 stream_name: StreamName,
1435 ) -> Result<(), ExecutionError> {
1436 let this = self.inner();
1437 ensure!(
1438 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1439 ExecutionError::StreamNameTooLong
1440 );
1441 let stream_id = StreamId {
1442 stream_name,
1443 application_id: application_id.into(),
1444 };
1445 let subscriber_app_id = this.current_application().id;
1446 this.execution_state_sender
1447 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1448 chain_id,
1449 stream_id,
1450 subscriber_app_id,
1451 callback,
1452 })?
1453 .recv_response()?;
1454 Ok(())
1455 }
1456
1457 fn query_service(
1458 &mut self,
1459 application_id: ApplicationId,
1460 query: Vec<u8>,
1461 ) -> Result<Vec<u8>, ExecutionError> {
1462 let mut this = self.inner();
1463
1464 let app_permissions = this
1465 .execution_state_sender
1466 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1467 .recv_response()?;
1468
1469 let app_id = this.current_application().id;
1470 ensure!(
1471 app_permissions.can_call_services(&app_id),
1472 ExecutionError::UnauthorizedApplication(app_id)
1473 );
1474
1475 this.resource_controller.track_service_oracle_call()?;
1476
1477 this.run_service_oracle_query(application_id, query)
1478 }
1479
1480 fn open_chain(
1481 &mut self,
1482 ownership: ChainOwnership,
1483 application_permissions: ApplicationPermissions,
1484 balance: Amount,
1485 ) -> Result<ChainId, ExecutionError> {
1486 let parent_id = self.inner().chain_id;
1487 let block_height = self.block_height()?;
1488
1489 let timestamp = self.inner().user_context;
1490
1491 let chain_id = self
1492 .inner()
1493 .execution_state_sender
1494 .send_request(|callback| ExecutionRequest::OpenChain {
1495 ownership,
1496 balance,
1497 parent_id,
1498 block_height,
1499 timestamp,
1500 application_permissions,
1501 callback,
1502 })?
1503 .recv_response()?;
1504
1505 Ok(chain_id)
1506 }
1507
1508 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1509 let this = self.inner();
1510 let application_id = this.current_application().id;
1511 this.execution_state_sender
1512 .send_request(|callback| ExecutionRequest::CloseChain {
1513 application_id,
1514 callback,
1515 })?
1516 .recv_response()?
1517 }
1518
1519 fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1520 let this = self.inner();
1521 let application_id = this.current_application().id;
1522 this.execution_state_sender
1523 .send_request(|callback| ExecutionRequest::ChangeOwnership {
1524 application_id,
1525 ownership,
1526 callback,
1527 })?
1528 .recv_response()?
1529 }
1530
1531 fn change_application_permissions(
1532 &mut self,
1533 application_permissions: ApplicationPermissions,
1534 ) -> Result<(), ExecutionError> {
1535 let this = self.inner();
1536 let application_id = this.current_application().id;
1537 this.execution_state_sender
1538 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1539 application_id,
1540 application_permissions,
1541 callback,
1542 })?
1543 .recv_response()?
1544 }
1545
1546 fn peek_application_index(&mut self) -> Result<u32, ExecutionError> {
1547 let index = self
1548 .inner()
1549 .execution_state_sender
1550 .send_request(move |callback| ExecutionRequest::PeekApplicationIndex { callback })?
1551 .recv_response()?;
1552 Ok(index)
1553 }
1554
1555 fn create_application(
1556 &mut self,
1557 module_id: ModuleId,
1558 parameters: Vec<u8>,
1559 argument: Vec<u8>,
1560 required_application_ids: Vec<ApplicationId>,
1561 ) -> Result<ApplicationId, ExecutionError> {
1562 let chain_id = self.inner().chain_id;
1563 let block_height = self.block_height()?;
1564
1565 let CreateApplicationResult { app_id } = self
1566 .inner()
1567 .execution_state_sender
1568 .send_request(move |callback| ExecutionRequest::CreateApplication {
1569 chain_id,
1570 block_height,
1571 module_id,
1572 parameters,
1573 required_application_ids,
1574 callback,
1575 })?
1576 .recv_response()?;
1577
1578 let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1579
1580 contract
1581 .try_lock()
1582 .expect("Applications should not have reentrant calls")
1583 .instantiate(argument)?;
1584
1585 self.inner().finish_call();
1586
1587 Ok(app_id)
1588 }
1589
1590 fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1591 let blob = Blob::new_data(bytes);
1592 let blob_id = blob.id();
1593 let this = self.inner();
1594 this.execution_state_sender
1595 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1596 .recv_response()?;
1597 Ok(DataBlobHash(blob_id.hash))
1598 }
1599
1600 fn publish_module(
1601 &mut self,
1602 contract: Bytecode,
1603 service: Bytecode,
1604 vm_runtime: VmRuntime,
1605 ) -> Result<ModuleId, ExecutionError> {
1606 let (blobs, module_id) =
1607 crate::runtime::create_bytecode_blobs_sync(contract, service, vm_runtime);
1608 let this = self.inner();
1609 for blob in blobs {
1610 this.execution_state_sender
1611 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1612 .recv_response()?;
1613 }
1614 Ok(module_id)
1615 }
1616
1617 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1618 let this = self.inner();
1619 let round = this.round;
1620 this.execution_state_sender
1621 .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1622 .recv_response()
1623 }
1624
1625 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1626 let mut this = self.inner();
1627 let id = this.current_application().id;
1628 let state = this.view_user_states.entry(id).or_default();
1629 state.force_all_pending_queries()?;
1630 this.resource_controller.track_write_operations(
1631 batch
1632 .num_operations()
1633 .try_into()
1634 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1635 )?;
1636 this.resource_controller
1637 .track_bytes_written(batch.size() as u64)?;
1638 this.execution_state_sender
1639 .send_request(|callback| ExecutionRequest::WriteBatch {
1640 id,
1641 batch,
1642 callback,
1643 })?
1644 .recv_response()?;
1645 Ok(())
1646 }
1647}
1648
1649impl ServiceSyncRuntime {
1650 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1652 Self::new_with_deadline(execution_state_sender, context, None)
1653 }
1654
1655 pub fn new_with_deadline(
1657 execution_state_sender: ExecutionStateSender,
1658 context: QueryContext,
1659 deadline: Option<Instant>,
1660 ) -> Self {
1661 let allow_application_logs = execution_state_sender
1663 .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1664 .ok()
1665 .and_then(|receiver| receiver.recv_response().ok())
1666 .unwrap_or(false);
1667
1668 let runtime = SyncRuntime(Some(
1669 SyncRuntimeInternal::new(
1670 context.chain_id,
1671 context.next_block_height,
1672 None,
1673 None,
1674 execution_state_sender,
1675 deadline,
1676 None,
1677 ResourceController::default(),
1678 (),
1679 allow_application_logs,
1680 )
1681 .into(),
1682 ));
1683
1684 ServiceSyncRuntime {
1685 runtime,
1686 current_context: context,
1687 }
1688 }
1689
1690 pub(crate) fn preload_service(
1692 &self,
1693 id: ApplicationId,
1694 code: UserServiceCode,
1695 description: ApplicationDescription,
1696 ) -> Result<(), ExecutionError> {
1697 let this = self
1698 .runtime
1699 .0
1700 .as_ref()
1701 .expect("services shouldn't be preloaded while the runtime is being dropped");
1702 let mut this_guard = this.inner();
1703
1704 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1705 entry.insert((code, description));
1706 }
1707
1708 Ok(())
1709 }
1710
1711 pub fn run(&mut self, incoming_requests: std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1713 while let Ok(request) = incoming_requests.recv() {
1714 let ServiceRuntimeRequest::Query {
1715 application_id,
1716 context,
1717 query,
1718 callback,
1719 } = request;
1720
1721 let result = self
1722 .prepare_for_query(context)
1723 .and_then(|()| self.run_query(application_id, query));
1724
1725 if let Err(err) = callback.send(result) {
1726 tracing::debug!(%err, "Receiver for query result has been dropped");
1727 }
1728 }
1729 }
1730
1731 pub(crate) fn prepare_for_query(
1733 &mut self,
1734 new_context: QueryContext,
1735 ) -> Result<(), ExecutionError> {
1736 let expected_context = QueryContext {
1737 local_time: new_context.local_time,
1738 ..self.current_context
1739 };
1740
1741 if new_context != expected_context {
1742 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1743 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1744 } else {
1745 self.handle_mut()
1746 .inner()
1747 .execution_state_sender
1748 .send_request(|callback| ExecutionRequest::SetLocalTime {
1749 local_time: new_context.local_time,
1750 callback,
1751 })?
1752 .recv_response()?;
1753 }
1754 Ok(())
1755 }
1756
1757 pub(crate) fn run_query(
1759 &mut self,
1760 application_id: ApplicationId,
1761 query: Vec<u8>,
1762 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1763 let this = self.handle_mut();
1764 let response = this.try_query_application(application_id, query)?;
1765 let operations = mem::take(&mut this.inner().scheduled_operations);
1766
1767 Ok(QueryOutcome {
1768 response,
1769 operations,
1770 })
1771 }
1772
1773 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1775 self.runtime.0.as_mut().expect(
1776 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1777 )
1778 }
1779}
1780
1781impl ServiceRuntime for ServiceSyncRuntimeHandle {
1782 fn try_query_application(
1784 &mut self,
1785 queried_id: ApplicationId,
1786 argument: Vec<u8>,
1787 ) -> Result<Vec<u8>, ExecutionError> {
1788 let service = {
1789 let mut this = self.inner();
1790
1791 let application = this.load_service_instance(self.clone(), queried_id)?;
1793 this.push_application(ApplicationStatus {
1795 caller_id: None,
1796 id: queried_id,
1797 description: application.description,
1798 signer: None,
1799 });
1800 application.instance
1801 };
1802 let response = service
1803 .try_lock()
1804 .expect("Applications should not have reentrant calls")
1805 .handle_query(argument)?;
1806 self.inner().pop_application();
1807 Ok(response)
1808 }
1809
1810 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1811 let mut this = self.inner();
1812 let application_id = this.current_application().id;
1813
1814 this.scheduled_operations.push(Operation::User {
1815 application_id,
1816 bytes: operation,
1817 });
1818
1819 Ok(())
1820 }
1821
1822 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1823 if let Some(deadline) = self.inner().deadline {
1824 if Instant::now() >= deadline {
1825 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1826 }
1827 }
1828 Ok(())
1829 }
1830}
1831
1832pub enum ServiceRuntimeRequest {
1834 Query {
1835 application_id: ApplicationId,
1836 context: QueryContext,
1837 query: Vec<u8>,
1838 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1839 },
1840}
1841
1842#[derive(Clone, Copy, Debug)]
1844struct ExecutingMessage {
1845 is_bouncing: bool,
1846 origin: ChainId,
1847}
1848
1849impl From<&MessageContext> for ExecutingMessage {
1850 fn from(context: &MessageContext) -> Self {
1851 ExecutingMessage {
1852 is_bouncing: context.is_bouncing,
1853 origin: context.origin,
1854 }
1855 }
1856}
1857
1858pub fn create_bytecode_blobs_sync(
1860 contract: Bytecode,
1861 service: Bytecode,
1862 vm_runtime: VmRuntime,
1863) -> (Vec<Blob>, ModuleId) {
1864 match vm_runtime {
1865 VmRuntime::Wasm => {
1866 let compressed_contract = contract.compress();
1867 let compressed_service = service.compress();
1868 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1869 let service_blob = Blob::new_service_bytecode(compressed_service);
1870 let module_id =
1871 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1872 (vec![contract_blob, service_blob], module_id)
1873 }
1874 VmRuntime::Evm => {
1875 let compressed_contract = contract.compress();
1876 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1877 let module_id = ModuleId::new(
1878 evm_contract_blob.id().hash,
1879 evm_contract_blob.id().hash,
1880 vm_runtime,
1881 );
1882 (vec![evm_contract_blob], module_id)
1883 }
1884 }
1885}