1use std::{
5 collections::{hash_map, BTreeMap, HashMap, HashSet},
6 mem,
7 ops::{Deref, DerefMut},
8 sync::{Arc, Mutex},
9};
10
11use custom_debug_derive::Debug;
12use linera_base::{
13 data_types::{
14 Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Bytecode,
15 SendMessageRequest, Timestamp,
16 },
17 ensure, http,
18 identifiers::{
19 Account, AccountOwner, ChainId, EventId, GenericApplicationId, StreamId, StreamName,
20 },
21 ownership::ChainOwnership,
22 time::Instant,
23 vm::VmRuntime,
24};
25use linera_views::batch::Batch;
26use oneshot::Receiver;
27use tracing::instrument;
28
29use crate::{
30 execution::UserAction,
31 execution_state_actor::{ExecutionRequest, ExecutionStateSender},
32 resources::ResourceController,
33 system::CreateApplicationResult,
34 util::{ReceiverExt, UnboundedSenderExt},
35 ApplicationDescription, ApplicationId, BaseRuntime, ContractRuntime, DataBlobHash,
36 ExecutionError, FinalizeContext, Message, MessageContext, MessageKind, ModuleId, Operation,
37 OutgoingMessage, QueryContext, QueryOutcome, ServiceRuntime, UserContractCode,
38 UserContractInstance, UserServiceCode, UserServiceInstance, MAX_STREAM_NAME_LEN,
39};
40
41#[cfg(test)]
42#[path = "unit_tests/runtime_tests.rs"]
43mod tests;
44
45pub trait WithContext {
46 type UserContext;
47 type Code;
48}
49
50impl WithContext for UserContractInstance {
51 type UserContext = Timestamp;
52 type Code = UserContractCode;
53}
54
55impl WithContext for UserServiceInstance {
56 type UserContext = ();
57 type Code = UserServiceCode;
58}
59
60#[cfg(test)]
61impl WithContext for Arc<dyn std::any::Any + Send + Sync> {
62 type UserContext = ();
63 type Code = ();
64}
65
66#[derive(Debug)]
67pub struct SyncRuntime<UserInstance: WithContext>(Option<SyncRuntimeHandle<UserInstance>>);
68
69pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
70
71pub struct ServiceSyncRuntime {
72 runtime: SyncRuntime<UserServiceInstance>,
73 current_context: QueryContext,
74}
75
76#[derive(Debug)]
77pub struct SyncRuntimeHandle<UserInstance: WithContext>(
78 Arc<Mutex<SyncRuntimeInternal<UserInstance>>>,
79);
80
81pub type ContractSyncRuntimeHandle = SyncRuntimeHandle<UserContractInstance>;
82pub type ServiceSyncRuntimeHandle = SyncRuntimeHandle<UserServiceInstance>;
83
84#[derive(Debug)]
86pub struct SyncRuntimeInternal<UserInstance: WithContext> {
87 chain_id: ChainId,
89 height: BlockHeight,
92 round: Option<u32>,
94 #[debug(skip_if = Option::is_none)]
96 executing_message: Option<ExecutingMessage>,
97
98 execution_state_sender: ExecutionStateSender,
100
101 is_finalizing: bool,
105 applications_to_finalize: Vec<ApplicationId>,
107
108 preloaded_applications: HashMap<ApplicationId, (UserInstance::Code, ApplicationDescription)>,
110 loaded_applications: HashMap<ApplicationId, LoadedApplication<UserInstance>>,
112 call_stack: Vec<ApplicationStatus>,
114 active_applications: HashSet<ApplicationId>,
116 scheduled_operations: Vec<Operation>,
118
119 view_user_states: BTreeMap<ApplicationId, ViewUserState>,
121
122 deadline: Option<Instant>,
126
127 #[debug(skip_if = Option::is_none)]
129 refund_grant_to: Option<Account>,
130 resource_controller: ResourceController,
132 user_context: UserInstance::UserContext,
134 allow_application_logs: bool,
136}
137
138#[derive(Debug)]
140struct ApplicationStatus {
141 caller_id: Option<ApplicationId>,
143 id: ApplicationId,
145 description: ApplicationDescription,
147 signer: Option<AccountOwner>,
149}
150
151#[derive(Debug)]
153struct LoadedApplication<Instance> {
154 instance: Arc<Mutex<Instance>>,
155 description: ApplicationDescription,
156}
157
158impl<Instance> LoadedApplication<Instance> {
159 fn new(instance: Instance, description: ApplicationDescription) -> Self {
161 LoadedApplication {
162 instance: Arc::new(Mutex::new(instance)),
163 description,
164 }
165 }
166}
167
168impl<Instance> Clone for LoadedApplication<Instance> {
169 fn clone(&self) -> Self {
172 LoadedApplication {
173 instance: self.instance.clone(),
174 description: self.description.clone(),
175 }
176 }
177}
178
179#[derive(Debug)]
180enum Promise<T> {
181 Ready(T),
182 Pending(Receiver<T>),
183}
184
185impl<T> Promise<T> {
186 fn force(&mut self) -> Result<(), ExecutionError> {
187 if let Promise::Pending(receiver) = self {
188 let value = receiver
189 .recv_ref()
190 .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
191 *self = Promise::Ready(value);
192 }
193 Ok(())
194 }
195
196 fn read(self) -> Result<T, ExecutionError> {
197 match self {
198 Promise::Pending(receiver) => {
199 let value = receiver.recv_response()?;
200 Ok(value)
201 }
202 Promise::Ready(value) => Ok(value),
203 }
204 }
205}
206
207#[derive(Debug, Default)]
209struct QueryManager<T> {
210 pending_queries: BTreeMap<u32, Promise<T>>,
212 query_count: u32,
214 active_query_count: u32,
216}
217
218impl<T> QueryManager<T> {
219 fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
220 let id = self.query_count;
221 self.pending_queries.insert(id, Promise::Pending(receiver));
222 self.query_count = self
223 .query_count
224 .checked_add(1)
225 .ok_or(ArithmeticError::Overflow)?;
226 self.active_query_count = self
227 .active_query_count
228 .checked_add(1)
229 .ok_or(ArithmeticError::Overflow)?;
230 Ok(id)
231 }
232
233 fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
234 let promise = self
235 .pending_queries
236 .remove(&id)
237 .ok_or(ExecutionError::InvalidPromise)?;
238 let value = promise.read()?;
239 self.active_query_count -= 1;
240 Ok(value)
241 }
242
243 fn force_all(&mut self) -> Result<(), ExecutionError> {
244 for promise in self.pending_queries.values_mut() {
245 promise.force()?;
246 }
247 Ok(())
248 }
249}
250
251type Keys = Vec<Vec<u8>>;
252type Value = Vec<u8>;
253type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
254
255#[derive(Debug, Default)]
256struct ViewUserState {
257 contains_key_queries: QueryManager<bool>,
259 contains_keys_queries: QueryManager<Vec<bool>>,
261 read_value_queries: QueryManager<Option<Value>>,
263 read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
265 find_keys_queries: QueryManager<Keys>,
267 find_key_values_queries: QueryManager<KeyValues>,
269}
270
271impl ViewUserState {
272 fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
273 self.contains_key_queries.force_all()?;
274 self.contains_keys_queries.force_all()?;
275 self.read_value_queries.force_all()?;
276 self.read_multi_values_queries.force_all()?;
277 self.find_keys_queries.force_all()?;
278 self.find_key_values_queries.force_all()?;
279 Ok(())
280 }
281}
282
283impl<UserInstance: WithContext> Deref for SyncRuntime<UserInstance> {
284 type Target = SyncRuntimeHandle<UserInstance>;
285
286 fn deref(&self) -> &Self::Target {
287 self.0.as_ref().expect(
288 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
289 )
290 }
291}
292
293impl<UserInstance: WithContext> DerefMut for SyncRuntime<UserInstance> {
294 fn deref_mut(&mut self) -> &mut Self::Target {
295 self.0.as_mut().expect(
296 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
297 )
298 }
299}
300
301impl<UserInstance: WithContext> Drop for SyncRuntime<UserInstance> {
302 fn drop(&mut self) {
303 if let Some(handle) = self.0.take() {
306 handle.inner().loaded_applications.clear();
307 }
308 }
309}
310
311impl<UserInstance: WithContext> SyncRuntimeInternal<UserInstance> {
312 #[expect(clippy::too_many_arguments)]
313 fn new(
314 chain_id: ChainId,
315 height: BlockHeight,
316 round: Option<u32>,
317 executing_message: Option<ExecutingMessage>,
318 execution_state_sender: ExecutionStateSender,
319 deadline: Option<Instant>,
320 refund_grant_to: Option<Account>,
321 resource_controller: ResourceController,
322 user_context: UserInstance::UserContext,
323 allow_application_logs: bool,
324 ) -> Self {
325 Self {
326 chain_id,
327 height,
328 round,
329 executing_message,
330 execution_state_sender,
331 is_finalizing: false,
332 applications_to_finalize: Vec::new(),
333 preloaded_applications: HashMap::new(),
334 loaded_applications: HashMap::new(),
335 call_stack: Vec::new(),
336 active_applications: HashSet::new(),
337 view_user_states: BTreeMap::new(),
338 deadline,
339 refund_grant_to,
340 resource_controller,
341 scheduled_operations: Vec::new(),
342 user_context,
343 allow_application_logs,
344 }
345 }
346
347 fn current_application(&self) -> &ApplicationStatus {
355 self.call_stack
356 .last()
357 .expect("Call stack is unexpectedly empty")
358 }
359
360 fn push_application(&mut self, status: ApplicationStatus) {
364 self.active_applications.insert(status.id);
365 self.call_stack.push(status);
366 }
367
368 fn pop_application(&mut self) -> ApplicationStatus {
376 let status = self
377 .call_stack
378 .pop()
379 .expect("Can't remove application from empty call stack");
380 assert!(self.active_applications.remove(&status.id));
381 status
382 }
383
384 fn check_for_reentrancy(&self, application_id: ApplicationId) -> Result<(), ExecutionError> {
388 ensure!(
389 !self.active_applications.contains(&application_id),
390 ExecutionError::ReentrantCall(application_id)
391 );
392 Ok(())
393 }
394}
395
396impl SyncRuntimeInternal<UserContractInstance> {
397 #[instrument(skip_all, fields(application_id = %id))]
399 fn load_contract_instance(
400 &mut self,
401 this: SyncRuntimeHandle<UserContractInstance>,
402 id: ApplicationId,
403 ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
404 match self.loaded_applications.entry(id) {
405 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
406
407 hash_map::Entry::Vacant(entry) => {
408 let (code, description) = match self.preloaded_applications.entry(id) {
411 #[cfg(web)]
413 hash_map::Entry::Vacant(_) => {
414 drop(this);
415 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
416 id,
417 )));
418 }
419 #[cfg(not(web))]
420 hash_map::Entry::Vacant(entry) => {
421 let (code, description) = self
422 .execution_state_sender
423 .send_request(move |callback| ExecutionRequest::LoadContract {
424 id,
425 callback,
426 })?
427 .recv_response()?;
428 entry.insert((code, description)).clone()
429 }
430 hash_map::Entry::Occupied(entry) => entry.get().clone(),
431 };
432 let instance = code.instantiate(this)?;
433
434 self.applications_to_finalize.push(id);
435 Ok(entry
436 .insert(LoadedApplication::new(instance, description))
437 .clone())
438 }
439 }
440 }
441
442 fn prepare_for_call(
444 &mut self,
445 this: ContractSyncRuntimeHandle,
446 authenticated: bool,
447 callee_id: ApplicationId,
448 ) -> Result<Arc<Mutex<UserContractInstance>>, ExecutionError> {
449 self.check_for_reentrancy(callee_id)?;
450
451 ensure!(
452 !self.is_finalizing,
453 ExecutionError::CrossApplicationCallInFinalize {
454 caller_id: Box::new(self.current_application().id),
455 callee_id: Box::new(callee_id),
456 }
457 );
458
459 let application = self.load_contract_instance(this, callee_id)?;
461
462 let caller = self.current_application();
463 let caller_id = caller.id;
464 let caller_signer = caller.signer;
465 let authenticated_owner = match caller_signer {
467 Some(signer) if authenticated => Some(signer),
468 _ => None,
469 };
470 let authenticated_caller_id = authenticated.then_some(caller_id);
471 self.push_application(ApplicationStatus {
472 caller_id: authenticated_caller_id,
473 id: callee_id,
474 description: application.description,
475 signer: authenticated_owner,
477 });
478 Ok(application.instance)
479 }
480
481 fn finish_call(&mut self) {
483 self.pop_application();
484 }
485
486 fn run_service_oracle_query(
488 &mut self,
489 application_id: ApplicationId,
490 query: Vec<u8>,
491 ) -> Result<Vec<u8>, ExecutionError> {
492 let timeout = self
493 .resource_controller
494 .remaining_service_oracle_execution_time()?;
495 let execution_start = Instant::now();
496 let deadline = Some(execution_start + timeout);
497 let response = self
498 .execution_state_sender
499 .send_request(|callback| ExecutionRequest::QueryServiceOracle {
500 deadline,
501 application_id,
502 next_block_height: self.height,
503 query,
504 callback,
505 })?
506 .recv_response()?;
507
508 self.resource_controller
509 .track_service_oracle_execution(execution_start.elapsed())?;
510 self.resource_controller
511 .track_service_oracle_response(response.len())?;
512
513 Ok(response)
514 }
515}
516
517impl SyncRuntimeInternal<UserServiceInstance> {
518 fn load_service_instance(
520 &mut self,
521 this: SyncRuntimeHandle<UserServiceInstance>,
522 id: ApplicationId,
523 ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
524 match self.loaded_applications.entry(id) {
525 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
526
527 hash_map::Entry::Vacant(entry) => {
528 let (code, description) = match self.preloaded_applications.entry(id) {
531 #[cfg(web)]
533 hash_map::Entry::Vacant(_) => {
534 drop(this);
535 return Err(ExecutionError::UnsupportedDynamicApplicationLoad(Box::new(
536 id,
537 )));
538 }
539 #[cfg(not(web))]
540 hash_map::Entry::Vacant(entry) => {
541 let (code, description) = self
542 .execution_state_sender
543 .send_request(move |callback| ExecutionRequest::LoadService {
544 id,
545 callback,
546 })?
547 .recv_response()?;
548 entry.insert((code, description)).clone()
549 }
550 hash_map::Entry::Occupied(entry) => entry.get().clone(),
551 };
552 let instance = code.instantiate(this)?;
553
554 self.applications_to_finalize.push(id);
555 Ok(entry
556 .insert(LoadedApplication::new(instance, description))
557 .clone())
558 }
559 }
560 }
561}
562
563impl<UserInstance: WithContext> SyncRuntime<UserInstance> {
564 fn into_inner(mut self) -> Option<SyncRuntimeInternal<UserInstance>> {
565 let handle = self.0.take().expect(
566 "`SyncRuntime` should not be used after its `inner` contents have been moved out",
567 );
568 let runtime = Arc::into_inner(handle.0)?
569 .into_inner()
570 .expect("`SyncRuntime` should run in a single thread which should not panic");
571 Some(runtime)
572 }
573}
574
575impl<UserInstance: WithContext> From<SyncRuntimeInternal<UserInstance>>
576 for SyncRuntimeHandle<UserInstance>
577{
578 fn from(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
579 SyncRuntimeHandle(Arc::new(Mutex::new(runtime)))
580 }
581}
582
583impl<UserInstance: WithContext> SyncRuntimeHandle<UserInstance> {
584 fn inner(&self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
585 self.0
586 .try_lock()
587 .expect("Synchronous runtimes run on a single execution thread")
588 }
589}
590
591impl<UserInstance: WithContext> BaseRuntime for SyncRuntimeHandle<UserInstance>
592where
593 Self: ContractOrServiceRuntime,
594{
595 type Read = ();
596 type ReadValueBytes = u32;
597 type ContainsKey = u32;
598 type ContainsKeys = u32;
599 type ReadMultiValuesBytes = u32;
600 type FindKeysByPrefix = u32;
601 type FindKeyValuesByPrefix = u32;
602
603 fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
604 let mut this = self.inner();
605 let chain_id = this.chain_id;
606 this.resource_controller.track_runtime_chain_id()?;
607 Ok(chain_id)
608 }
609
610 fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
611 let mut this = self.inner();
612 let height = this.height;
613 this.resource_controller.track_runtime_block_height()?;
614 Ok(height)
615 }
616
617 fn application_id(&mut self) -> Result<ApplicationId, ExecutionError> {
618 let mut this = self.inner();
619 let application_id = this.current_application().id;
620 this.resource_controller.track_runtime_application_id()?;
621 Ok(application_id)
622 }
623
624 fn application_creator_chain_id(&mut self) -> Result<ChainId, ExecutionError> {
625 let mut this = self.inner();
626 let application_creator_chain_id = this.current_application().description.creator_chain_id;
627 this.resource_controller.track_runtime_application_id()?;
628 Ok(application_creator_chain_id)
629 }
630
631 fn read_application_description(
632 &mut self,
633 application_id: ApplicationId,
634 ) -> Result<ApplicationDescription, ExecutionError> {
635 let mut this = self.inner();
636 let description = this
637 .execution_state_sender
638 .send_request(|callback| ExecutionRequest::ReadApplicationDescription {
639 application_id,
640 callback,
641 })?
642 .recv_response()?;
643 this.resource_controller
644 .track_runtime_application_description(&description)?;
645 Ok(description)
646 }
647
648 fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
649 let mut this = self.inner();
650 let parameters = this.current_application().description.parameters.clone();
651 this.resource_controller
652 .track_runtime_application_parameters(¶meters)?;
653 Ok(parameters)
654 }
655
656 fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
657 let mut this = self.inner();
658 let timestamp = this
659 .execution_state_sender
660 .send_request(|callback| ExecutionRequest::SystemTimestamp { callback })?
661 .recv_response()?;
662 this.resource_controller.track_runtime_timestamp()?;
663 Ok(timestamp)
664 }
665
666 fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
667 let mut this = self.inner();
668 let balance = this
669 .execution_state_sender
670 .send_request(|callback| ExecutionRequest::ChainBalance { callback })?
671 .recv_response()?;
672 this.resource_controller.track_runtime_balance()?;
673 Ok(balance)
674 }
675
676 fn read_owner_balance(&mut self, owner: AccountOwner) -> Result<Amount, ExecutionError> {
677 let mut this = self.inner();
678 let balance = this
679 .execution_state_sender
680 .send_request(|callback| ExecutionRequest::OwnerBalance { owner, callback })?
681 .recv_response()?;
682 this.resource_controller.track_runtime_balance()?;
683 Ok(balance)
684 }
685
686 fn read_owner_balances(&mut self) -> Result<Vec<(AccountOwner, Amount)>, ExecutionError> {
687 let mut this = self.inner();
688 let owner_balances = this
689 .execution_state_sender
690 .send_request(|callback| ExecutionRequest::OwnerBalances { callback })?
691 .recv_response()?;
692 this.resource_controller
693 .track_runtime_owner_balances(&owner_balances)?;
694 Ok(owner_balances)
695 }
696
697 fn read_balance_owners(&mut self) -> Result<Vec<AccountOwner>, ExecutionError> {
698 let mut this = self.inner();
699 let owners = this
700 .execution_state_sender
701 .send_request(|callback| ExecutionRequest::BalanceOwners { callback })?
702 .recv_response()?;
703 this.resource_controller.track_runtime_owners(&owners)?;
704 Ok(owners)
705 }
706
707 fn read_allowance(
708 &mut self,
709 owner: AccountOwner,
710 spender: AccountOwner,
711 ) -> Result<Amount, ExecutionError> {
712 let this = self.inner();
713 let allowance = this
714 .execution_state_sender
715 .send_request(|callback| ExecutionRequest::Allowance {
716 owner,
717 spender,
718 callback,
719 })?
720 .recv_response()?;
721 Ok(allowance)
722 }
723
724 fn read_allowances(
725 &mut self,
726 ) -> Result<Vec<(AccountOwner, AccountOwner, Amount)>, ExecutionError> {
727 let this = self.inner();
728 let allowances = this
729 .execution_state_sender
730 .send_request(|callback| ExecutionRequest::Allowances { callback })?
731 .recv_response()?;
732 Ok(allowances)
733 }
734
735 fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
736 let mut this = self.inner();
737 let chain_ownership = this
738 .execution_state_sender
739 .send_request(|callback| ExecutionRequest::ChainOwnership { callback })?
740 .recv_response()?;
741 this.resource_controller
742 .track_runtime_chain_ownership(&chain_ownership)?;
743 Ok(chain_ownership)
744 }
745
746 fn application_permissions(&mut self) -> Result<ApplicationPermissions, ExecutionError> {
747 let this = self.inner();
748 let application_permissions = this
749 .execution_state_sender
750 .send_request(|callback| ExecutionRequest::ApplicationPermissions { callback })?
751 .recv_response()?;
752 Ok(application_permissions)
753 }
754
755 fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
756 let mut this = self.inner();
757 let id = this.current_application().id;
758 this.resource_controller.track_read_operation()?;
759 let receiver = this
760 .execution_state_sender
761 .send_request(move |callback| ExecutionRequest::ContainsKey { id, key, callback })?;
762 let state = this.view_user_states.entry(id).or_default();
763 state.contains_key_queries.register(receiver)
764 }
765
766 fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
767 let mut this = self.inner();
768 let id = this.current_application().id;
769 let state = this.view_user_states.entry(id).or_default();
770 let value = state.contains_key_queries.wait(*promise)?;
771 Ok(value)
772 }
773
774 fn contains_keys_new(
775 &mut self,
776 keys: Vec<Vec<u8>>,
777 ) -> Result<Self::ContainsKeys, ExecutionError> {
778 let mut this = self.inner();
779 let id = this.current_application().id;
780 this.resource_controller.track_read_operation()?;
781 let receiver = this
782 .execution_state_sender
783 .send_request(move |callback| ExecutionRequest::ContainsKeys { id, keys, callback })?;
784 let state = this.view_user_states.entry(id).or_default();
785 state.contains_keys_queries.register(receiver)
786 }
787
788 fn contains_keys_wait(
789 &mut self,
790 promise: &Self::ContainsKeys,
791 ) -> Result<Vec<bool>, ExecutionError> {
792 let mut this = self.inner();
793 let id = this.current_application().id;
794 let state = this.view_user_states.entry(id).or_default();
795 let value = state.contains_keys_queries.wait(*promise)?;
796 Ok(value)
797 }
798
799 fn read_multi_values_bytes_new(
800 &mut self,
801 keys: Vec<Vec<u8>>,
802 ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
803 let mut this = self.inner();
804 let id = this.current_application().id;
805 this.resource_controller.track_read_operation()?;
806 let receiver = this.execution_state_sender.send_request(move |callback| {
807 ExecutionRequest::ReadMultiValuesBytes { id, keys, callback }
808 })?;
809 let state = this.view_user_states.entry(id).or_default();
810 state.read_multi_values_queries.register(receiver)
811 }
812
813 fn read_multi_values_bytes_wait(
814 &mut self,
815 promise: &Self::ReadMultiValuesBytes,
816 ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
817 let mut this = self.inner();
818 let id = this.current_application().id;
819 let state = this.view_user_states.entry(id).or_default();
820 let values = state.read_multi_values_queries.wait(*promise)?;
821 for value in &values {
822 if let Some(value) = &value {
823 this.resource_controller
824 .track_bytes_read(value.len() as u64)?;
825 }
826 }
827 Ok(values)
828 }
829
830 fn read_value_bytes_new(
831 &mut self,
832 key: Vec<u8>,
833 ) -> Result<Self::ReadValueBytes, ExecutionError> {
834 let mut this = self.inner();
835 let id = this.current_application().id;
836 this.resource_controller.track_read_operation()?;
837 let receiver = this
838 .execution_state_sender
839 .send_request(move |callback| ExecutionRequest::ReadValueBytes { id, key, callback })?;
840 let state = this.view_user_states.entry(id).or_default();
841 state.read_value_queries.register(receiver)
842 }
843
844 fn read_value_bytes_wait(
845 &mut self,
846 promise: &Self::ReadValueBytes,
847 ) -> Result<Option<Vec<u8>>, ExecutionError> {
848 let mut this = self.inner();
849 let id = this.current_application().id;
850 let value = {
851 let state = this.view_user_states.entry(id).or_default();
852 state.read_value_queries.wait(*promise)?
853 };
854 if let Some(value) = &value {
855 this.resource_controller
856 .track_bytes_read(value.len() as u64)?;
857 }
858 Ok(value)
859 }
860
861 fn find_keys_by_prefix_new(
862 &mut self,
863 key_prefix: Vec<u8>,
864 ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
865 let mut this = self.inner();
866 let id = this.current_application().id;
867 this.resource_controller.track_read_operation()?;
868 let receiver = this.execution_state_sender.send_request(move |callback| {
869 ExecutionRequest::FindKeysByPrefix {
870 id,
871 key_prefix,
872 callback,
873 }
874 })?;
875 let state = this.view_user_states.entry(id).or_default();
876 state.find_keys_queries.register(receiver)
877 }
878
879 fn find_keys_by_prefix_wait(
880 &mut self,
881 promise: &Self::FindKeysByPrefix,
882 ) -> Result<Vec<Vec<u8>>, ExecutionError> {
883 let mut this = self.inner();
884 let id = this.current_application().id;
885 let keys = {
886 let state = this.view_user_states.entry(id).or_default();
887 state.find_keys_queries.wait(*promise)?
888 };
889 let mut read_size = 0;
890 for key in &keys {
891 read_size += key.len();
892 }
893 this.resource_controller
894 .track_bytes_read(read_size as u64)?;
895 Ok(keys)
896 }
897
898 fn find_key_values_by_prefix_new(
899 &mut self,
900 key_prefix: Vec<u8>,
901 ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
902 let mut this = self.inner();
903 let id = this.current_application().id;
904 this.resource_controller.track_read_operation()?;
905 let receiver = this.execution_state_sender.send_request(move |callback| {
906 ExecutionRequest::FindKeyValuesByPrefix {
907 id,
908 key_prefix,
909 callback,
910 }
911 })?;
912 let state = this.view_user_states.entry(id).or_default();
913 state.find_key_values_queries.register(receiver)
914 }
915
916 fn find_key_values_by_prefix_wait(
917 &mut self,
918 promise: &Self::FindKeyValuesByPrefix,
919 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
920 let mut this = self.inner();
921 let id = this.current_application().id;
922 let state = this.view_user_states.entry(id).or_default();
923 let key_values = state.find_key_values_queries.wait(*promise)?;
924 let mut read_size = 0;
925 for (key, value) in &key_values {
926 read_size += key.len() + value.len();
927 }
928 this.resource_controller
929 .track_bytes_read(read_size as u64)?;
930 Ok(key_values)
931 }
932
933 fn perform_http_request(
934 &mut self,
935 request: http::Request,
936 ) -> Result<http::Response, ExecutionError> {
937 let mut this = self.inner();
938 let app_permissions = this
939 .execution_state_sender
940 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
941 .recv_response()?;
942
943 let app_id = this.current_application().id;
944 ensure!(
945 app_permissions.can_make_http_requests(&app_id),
946 ExecutionError::UnauthorizedApplication(app_id)
947 );
948
949 this.resource_controller.track_http_request()?;
950
951 this.execution_state_sender
952 .send_request(|callback| ExecutionRequest::PerformHttpRequest {
953 request,
954 http_responses_are_oracle_responses:
955 Self::LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE,
956 callback,
957 })?
958 .recv_response()
959 }
960
961 fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError> {
962 let this = self.inner();
963 this.execution_state_sender
964 .send_request(|callback| ExecutionRequest::AssertBefore {
965 timestamp,
966 callback,
967 })?
968 .recv_response()?
969 }
970
971 fn read_data_blob(&mut self, hash: DataBlobHash) -> Result<Vec<u8>, ExecutionError> {
972 let this = self.inner();
973 let blob_id = hash.into();
974 let content = this
975 .execution_state_sender
976 .send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
977 .recv_response()?;
978 Ok(content.into_vec_or_clone())
979 }
980
981 fn assert_data_blob_exists(&mut self, hash: DataBlobHash) -> Result<(), ExecutionError> {
982 let this = self.inner();
983 let blob_id = hash.into();
984 this.execution_state_sender
985 .send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
986 .recv_response()
987 }
988
989 fn has_empty_storage(&mut self, application: ApplicationId) -> Result<bool, ExecutionError> {
990 let this = self.inner();
991 this.execution_state_sender
992 .send_request(move |callback| ExecutionRequest::HasEmptyStorage {
993 application,
994 callback,
995 })?
996 .recv_response()
997 }
998
999 fn maximum_blob_size(&mut self) -> Result<u64, ExecutionError> {
1000 Ok(self.inner().resource_controller.policy().maximum_blob_size)
1001 }
1002
1003 fn allow_application_logs(&mut self) -> Result<bool, ExecutionError> {
1004 Ok(self.inner().allow_application_logs)
1005 }
1006
1007 #[cfg(web)]
1008 fn send_log(&mut self, message: String, level: tracing::log::Level) {
1009 let this = self.inner();
1010 this.execution_state_sender
1012 .unbounded_send(ExecutionRequest::Log { message, level })
1013 .ok();
1014 }
1015}
1016
1017trait ContractOrServiceRuntime {
1020 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool;
1026}
1027
1028impl ContractOrServiceRuntime for ContractSyncRuntimeHandle {
1029 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = true;
1030}
1031
1032impl ContractOrServiceRuntime for ServiceSyncRuntimeHandle {
1033 const LIMIT_HTTP_RESPONSE_SIZE_TO_ORACLE_RESPONSE_SIZE: bool = false;
1034}
1035
1036impl<UserInstance: WithContext> Clone for SyncRuntimeHandle<UserInstance> {
1037 fn clone(&self) -> Self {
1038 SyncRuntimeHandle(self.0.clone())
1039 }
1040}
1041
1042impl ContractSyncRuntime {
1043 pub(crate) fn new(
1044 execution_state_sender: ExecutionStateSender,
1045 chain_id: ChainId,
1046 refund_grant_to: Option<Account>,
1047 resource_controller: ResourceController,
1048 action: &UserAction,
1049 allow_application_logs: bool,
1050 ) -> Self {
1051 SyncRuntime(Some(ContractSyncRuntimeHandle::from(
1052 SyncRuntimeInternal::new(
1053 chain_id,
1054 action.height(),
1055 action.round(),
1056 if let UserAction::Message(context, _) = action {
1057 Some(context.into())
1058 } else {
1059 None
1060 },
1061 execution_state_sender,
1062 None,
1063 refund_grant_to,
1064 resource_controller,
1065 action.timestamp(),
1066 allow_application_logs,
1067 ),
1068 )))
1069 }
1070
1071 pub(crate) fn preload_contract(
1073 &self,
1074 id: ApplicationId,
1075 code: UserContractCode,
1076 description: ApplicationDescription,
1077 ) {
1078 let this = self
1079 .0
1080 .as_ref()
1081 .expect("contracts shouldn't be preloaded while the runtime is being dropped");
1082 let mut this_guard = this.inner();
1083
1084 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1085 entry.insert((code, description));
1086 }
1087 }
1088
1089 pub(crate) fn run_action(
1091 mut self,
1092 application_id: ApplicationId,
1093 chain_id: ChainId,
1094 action: UserAction,
1095 ) -> Result<(Option<Vec<u8>>, ResourceController), ExecutionError> {
1096 let result = self
1097 .deref_mut()
1098 .run_action(application_id, chain_id, action)?;
1099 let runtime = self
1100 .into_inner()
1101 .expect("Runtime clones should have been freed by now");
1102
1103 Ok((result, runtime.resource_controller))
1104 }
1105}
1106
1107impl ContractSyncRuntimeHandle {
1108 #[instrument(skip_all, fields(application_id = %application_id))]
1109 fn run_action(
1110 &self,
1111 application_id: ApplicationId,
1112 chain_id: ChainId,
1113 action: UserAction,
1114 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1115 let finalize_context = FinalizeContext {
1116 authenticated_owner: action.signer(),
1117 chain_id,
1118 height: action.height(),
1119 round: action.round(),
1120 };
1121
1122 {
1123 let runtime = self.inner();
1124 assert_eq!(runtime.chain_id, chain_id);
1125 assert_eq!(runtime.height, action.height());
1126 }
1127
1128 let signer = action.signer();
1129 let closure = move |code: &mut UserContractInstance| match action {
1130 UserAction::Instantiate(_context, argument) => {
1131 code.instantiate(argument).map(|()| None)
1132 }
1133 UserAction::Operation(_context, operation) => {
1134 code.execute_operation(operation).map(Option::Some)
1135 }
1136 UserAction::Message(_context, message) => code.execute_message(message).map(|()| None),
1137 UserAction::ProcessStreams(_context, updates) => {
1138 code.process_streams(updates).map(|()| None)
1139 }
1140 };
1141
1142 let result = self.execute(application_id, signer, closure)?;
1143 self.finalize(finalize_context)?;
1144 Ok(result)
1145 }
1146
1147 #[instrument(skip_all)]
1149 fn finalize(&self, context: FinalizeContext) -> Result<(), ExecutionError> {
1150 let applications = mem::take(&mut self.inner().applications_to_finalize)
1151 .into_iter()
1152 .rev();
1153
1154 self.inner().is_finalizing = true;
1155
1156 for application in applications {
1157 self.execute(application, context.authenticated_owner, |contract| {
1158 contract.finalize().map(|_| None)
1159 })?;
1160 self.inner().loaded_applications.remove(&application);
1161 }
1162
1163 Ok(())
1164 }
1165
1166 #[instrument(skip_all, fields(application_id = %application_id))]
1168 fn execute(
1169 &self,
1170 application_id: ApplicationId,
1171 signer: Option<AccountOwner>,
1172 closure: impl FnOnce(&mut UserContractInstance) -> Result<Option<Vec<u8>>, ExecutionError>,
1173 ) -> Result<Option<Vec<u8>>, ExecutionError> {
1174 let contract = {
1175 let mut runtime = self.inner();
1176 let application = runtime.load_contract_instance(self.clone(), application_id)?;
1177
1178 let status = ApplicationStatus {
1179 caller_id: None,
1180 id: application_id,
1181 description: application.description.clone(),
1182 signer,
1183 };
1184
1185 runtime.push_application(status);
1186
1187 application
1188 };
1189
1190 let result = closure(
1191 &mut contract
1192 .instance
1193 .try_lock()
1194 .expect("Application should not be already executing"),
1195 )?;
1196
1197 let mut runtime = self.inner();
1198 let application_status = runtime.pop_application();
1199 assert_eq!(application_status.caller_id, None);
1200 assert_eq!(application_status.id, application_id);
1201 assert_eq!(application_status.description, contract.description);
1202 assert_eq!(application_status.signer, signer);
1203 assert!(runtime.call_stack.is_empty());
1204
1205 Ok(result)
1206 }
1207}
1208
1209impl ContractRuntime for ContractSyncRuntimeHandle {
1210 fn authenticated_owner(&mut self) -> Result<Option<AccountOwner>, ExecutionError> {
1211 let this = self.inner();
1212 Ok(this.current_application().signer)
1213 }
1214
1215 fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
1216 Ok(self
1217 .inner()
1218 .executing_message
1219 .map(|metadata| metadata.is_bouncing))
1220 }
1221
1222 fn message_origin_chain_id(&mut self) -> Result<Option<ChainId>, ExecutionError> {
1223 Ok(self
1224 .inner()
1225 .executing_message
1226 .map(|metadata| metadata.origin))
1227 }
1228
1229 fn message_origin_timestamp(&mut self) -> Result<Option<Timestamp>, ExecutionError> {
1230 Ok(self
1231 .inner()
1232 .executing_message
1233 .map(|metadata| metadata.origin_timestamp))
1234 }
1235
1236 fn authenticated_caller_id(&mut self) -> Result<Option<ApplicationId>, ExecutionError> {
1237 let this = self.inner();
1238 if this.call_stack.len() <= 1 {
1239 return Ok(None);
1240 }
1241 Ok(this.current_application().caller_id)
1242 }
1243
1244 fn maximum_fuel_per_block(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1245 Ok(match vm_runtime {
1246 VmRuntime::Wasm => {
1247 self.inner()
1248 .resource_controller
1249 .policy()
1250 .maximum_wasm_fuel_per_block
1251 }
1252 VmRuntime::Evm => {
1253 self.inner()
1254 .resource_controller
1255 .policy()
1256 .maximum_evm_fuel_per_block
1257 }
1258 })
1259 }
1260
1261 fn remaining_fuel(&mut self, vm_runtime: VmRuntime) -> Result<u64, ExecutionError> {
1262 Ok(self.inner().resource_controller.remaining_fuel(vm_runtime))
1263 }
1264
1265 fn consume_fuel(&mut self, fuel: u64, vm_runtime: VmRuntime) -> Result<(), ExecutionError> {
1266 let mut this = self.inner();
1267 this.resource_controller.track_fuel(fuel, vm_runtime)
1268 }
1269
1270 fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
1271 let mut this = self.inner();
1272 let application = this.current_application();
1273 let application_id = application.id;
1274 let authenticated_owner = application.signer;
1275 let mut refund_grant_to = this.refund_grant_to;
1276
1277 let grant = this
1278 .resource_controller
1279 .policy()
1280 .total_price(&message.grant)?;
1281 if grant.is_zero() {
1282 refund_grant_to = None;
1283 } else {
1284 this.resource_controller.track_grant(grant)?;
1285 }
1286 let kind = if message.is_tracked {
1287 MessageKind::Tracked
1288 } else {
1289 MessageKind::Simple
1290 };
1291
1292 this.execution_state_sender
1293 .send_request(|callback| ExecutionRequest::AddOutgoingMessage {
1294 message: OutgoingMessage {
1295 destination: message.destination,
1296 authenticated_owner,
1297 refund_grant_to,
1298 grant,
1299 kind,
1300 message: Message::User {
1301 application_id,
1302 bytes: message.message,
1303 },
1304 },
1305 callback,
1306 })?
1307 .recv_response()?;
1308
1309 Ok(())
1310 }
1311
1312 fn transfer(
1313 &mut self,
1314 source: AccountOwner,
1315 destination: Account,
1316 amount: Amount,
1317 ) -> Result<(), ExecutionError> {
1318 let this = self.inner();
1319 let current_application = this.current_application();
1320 let application_id = current_application.id;
1321 let signer = current_application.signer;
1322
1323 this.execution_state_sender
1324 .send_request(|callback| ExecutionRequest::Transfer {
1325 source,
1326 destination,
1327 amount,
1328 signer,
1329 application_id,
1330 callback,
1331 })?
1332 .recv_response()?;
1333 Ok(())
1334 }
1335
1336 fn claim(
1337 &mut self,
1338 source: Account,
1339 destination: Account,
1340 amount: Amount,
1341 ) -> Result<(), ExecutionError> {
1342 let this = self.inner();
1343 let current_application = this.current_application();
1344 let application_id = current_application.id;
1345 let signer = current_application.signer;
1346
1347 this.execution_state_sender
1348 .send_request(|callback| ExecutionRequest::Claim {
1349 source,
1350 destination,
1351 amount,
1352 signer,
1353 application_id,
1354 callback,
1355 })?
1356 .recv_response()?;
1357 Ok(())
1358 }
1359
1360 fn approve(
1361 &mut self,
1362 owner: AccountOwner,
1363 spender: AccountOwner,
1364 amount: Amount,
1365 ) -> Result<(), ExecutionError> {
1366 let this = self.inner();
1367 let current_application = this.current_application();
1368 let application_id = current_application.id;
1369 let signer = current_application.signer;
1370
1371 this.execution_state_sender
1372 .send_request(|callback| ExecutionRequest::Approve {
1373 owner,
1374 spender,
1375 amount,
1376 signer,
1377 application_id,
1378 callback,
1379 })?
1380 .recv_response()?;
1381 Ok(())
1382 }
1383
1384 fn transfer_from(
1385 &mut self,
1386 owner: AccountOwner,
1387 spender: AccountOwner,
1388 destination: Account,
1389 amount: Amount,
1390 ) -> Result<(), ExecutionError> {
1391 let this = self.inner();
1392 let current_application = this.current_application();
1393 let application_id = current_application.id;
1394 let signer = current_application.signer;
1395
1396 this.execution_state_sender
1397 .send_request(|callback| ExecutionRequest::TransferFrom {
1398 owner,
1399 spender,
1400 destination,
1401 amount,
1402 signer,
1403 application_id,
1404 callback,
1405 })?
1406 .recv_response()?;
1407 Ok(())
1408 }
1409
1410 fn try_call_application(
1411 &mut self,
1412 authenticated: bool,
1413 callee_id: ApplicationId,
1414 argument: Vec<u8>,
1415 ) -> Result<Vec<u8>, ExecutionError> {
1416 let contract = self
1417 .inner()
1418 .prepare_for_call(self.clone(), authenticated, callee_id)?;
1419
1420 let value = contract
1421 .try_lock()
1422 .expect("Applications should not have reentrant calls")
1423 .execute_operation(argument)?;
1424
1425 self.inner().finish_call();
1426
1427 Ok(value)
1428 }
1429
1430 fn emit(&mut self, stream_name: StreamName, value: Vec<u8>) -> Result<u32, ExecutionError> {
1431 let mut this = self.inner();
1432 ensure!(
1433 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1434 ExecutionError::StreamNameTooLong
1435 );
1436 let application_id = GenericApplicationId::User(this.current_application().id);
1437 let stream_id = StreamId {
1438 stream_name,
1439 application_id,
1440 };
1441 let value_len = value.len() as u64;
1442 let index = this
1443 .execution_state_sender
1444 .send_request(|callback| ExecutionRequest::Emit {
1445 stream_id,
1446 value,
1447 callback,
1448 })?
1449 .recv_response()?;
1450 this.resource_controller.track_bytes_written(value_len)?;
1452 Ok(index)
1453 }
1454
1455 fn read_event(
1456 &mut self,
1457 chain_id: ChainId,
1458 stream_name: StreamName,
1459 index: u32,
1460 ) -> Result<Vec<u8>, ExecutionError> {
1461 let mut this = self.inner();
1462 ensure!(
1463 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1464 ExecutionError::StreamNameTooLong
1465 );
1466 let application_id = GenericApplicationId::User(this.current_application().id);
1467 let stream_id = StreamId {
1468 stream_name,
1469 application_id,
1470 };
1471 let event_id = EventId {
1472 stream_id,
1473 index,
1474 chain_id,
1475 };
1476 let event = this
1477 .execution_state_sender
1478 .send_request(|callback| ExecutionRequest::ReadEvent { event_id, callback })?
1479 .recv_response()?;
1480 this.resource_controller
1482 .track_bytes_read(event.len() as u64)?;
1483 Ok(event)
1484 }
1485
1486 fn subscribe_to_events(
1487 &mut self,
1488 chain_id: ChainId,
1489 application_id: ApplicationId,
1490 stream_name: StreamName,
1491 ) -> Result<(), ExecutionError> {
1492 let this = self.inner();
1493 ensure!(
1494 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1495 ExecutionError::StreamNameTooLong
1496 );
1497 let stream_id = StreamId {
1498 stream_name,
1499 application_id: application_id.into(),
1500 };
1501 let subscriber_app_id = this.current_application().id;
1502 this.execution_state_sender
1503 .send_request(|callback| ExecutionRequest::SubscribeToEvents {
1504 chain_id,
1505 stream_id,
1506 subscriber_app_id,
1507 callback,
1508 })?
1509 .recv_response()?;
1510 Ok(())
1511 }
1512
1513 fn unsubscribe_from_events(
1514 &mut self,
1515 chain_id: ChainId,
1516 application_id: ApplicationId,
1517 stream_name: StreamName,
1518 ) -> Result<(), ExecutionError> {
1519 let this = self.inner();
1520 ensure!(
1521 stream_name.0.len() <= MAX_STREAM_NAME_LEN,
1522 ExecutionError::StreamNameTooLong
1523 );
1524 let stream_id = StreamId {
1525 stream_name,
1526 application_id: application_id.into(),
1527 };
1528 let subscriber_app_id = this.current_application().id;
1529 this.execution_state_sender
1530 .send_request(|callback| ExecutionRequest::UnsubscribeFromEvents {
1531 chain_id,
1532 stream_id,
1533 subscriber_app_id,
1534 callback,
1535 })?
1536 .recv_response()?;
1537 Ok(())
1538 }
1539
1540 fn query_service(
1541 &mut self,
1542 application_id: ApplicationId,
1543 query: Vec<u8>,
1544 ) -> Result<Vec<u8>, ExecutionError> {
1545 let mut this = self.inner();
1546
1547 let app_permissions = this
1548 .execution_state_sender
1549 .send_request(|callback| ExecutionRequest::GetApplicationPermissions { callback })?
1550 .recv_response()?;
1551
1552 let app_id = this.current_application().id;
1553 ensure!(
1554 app_permissions.can_call_services(&app_id),
1555 ExecutionError::UnauthorizedApplication(app_id)
1556 );
1557
1558 this.resource_controller.track_service_oracle_call()?;
1559
1560 this.run_service_oracle_query(application_id, query)
1561 }
1562
1563 fn open_chain(
1564 &mut self,
1565 ownership: ChainOwnership,
1566 application_permissions: ApplicationPermissions,
1567 balance: Amount,
1568 ) -> Result<ChainId, ExecutionError> {
1569 let parent_id = self.inner().chain_id;
1570 let block_height = self.block_height()?;
1571
1572 let timestamp = self.inner().user_context;
1573
1574 let chain_id = self
1575 .inner()
1576 .execution_state_sender
1577 .send_request(|callback| ExecutionRequest::OpenChain {
1578 ownership,
1579 balance,
1580 parent_id,
1581 block_height,
1582 timestamp,
1583 application_permissions,
1584 callback,
1585 })?
1586 .recv_response()?;
1587
1588 Ok(chain_id)
1589 }
1590
1591 fn close_chain(&mut self) -> Result<(), ExecutionError> {
1592 let this = self.inner();
1593 let application_id = this.current_application().id;
1594 this.execution_state_sender
1595 .send_request(|callback| ExecutionRequest::CloseChain {
1596 application_id,
1597 callback,
1598 })?
1599 .recv_response()?
1600 }
1601
1602 fn change_ownership(&mut self, ownership: ChainOwnership) -> Result<(), ExecutionError> {
1603 let this = self.inner();
1604 let application_id = this.current_application().id;
1605 this.execution_state_sender
1606 .send_request(|callback| ExecutionRequest::ChangeOwnership {
1607 application_id,
1608 ownership,
1609 callback,
1610 })?
1611 .recv_response()?
1612 }
1613
1614 fn change_application_permissions(
1615 &mut self,
1616 application_permissions: ApplicationPermissions,
1617 ) -> Result<(), ExecutionError> {
1618 let this = self.inner();
1619 let application_id = this.current_application().id;
1620 this.execution_state_sender
1621 .send_request(|callback| ExecutionRequest::ChangeApplicationPermissions {
1622 application_id,
1623 application_permissions,
1624 callback,
1625 })?
1626 .recv_response()?
1627 }
1628
1629 fn peek_application_index(&mut self) -> Result<u32, ExecutionError> {
1630 let index = self
1631 .inner()
1632 .execution_state_sender
1633 .send_request(move |callback| ExecutionRequest::PeekApplicationIndex { callback })?
1634 .recv_response()?;
1635 Ok(index)
1636 }
1637
1638 fn create_application(
1639 &mut self,
1640 module_id: ModuleId,
1641 parameters: Vec<u8>,
1642 argument: Vec<u8>,
1643 required_application_ids: Vec<ApplicationId>,
1644 ) -> Result<ApplicationId, ExecutionError> {
1645 let chain_id = self.inner().chain_id;
1646 let block_height = self.block_height()?;
1647
1648 let CreateApplicationResult { app_id } = self
1649 .inner()
1650 .execution_state_sender
1651 .send_request(move |callback| ExecutionRequest::CreateApplication {
1652 chain_id,
1653 block_height,
1654 module_id,
1655 parameters,
1656 required_application_ids,
1657 callback,
1658 })?
1659 .recv_response()?;
1660
1661 let contract = self.inner().prepare_for_call(self.clone(), true, app_id)?;
1662
1663 contract
1664 .try_lock()
1665 .expect("Applications should not have reentrant calls")
1666 .instantiate(argument)?;
1667
1668 self.inner().finish_call();
1669
1670 Ok(app_id)
1671 }
1672
1673 fn create_data_blob(&mut self, bytes: Vec<u8>) -> Result<DataBlobHash, ExecutionError> {
1674 let blob = Blob::new_data(bytes);
1675 let blob_id = blob.id();
1676 let this = self.inner();
1677 this.execution_state_sender
1678 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1679 .recv_response()?;
1680 Ok(DataBlobHash(blob_id.hash))
1681 }
1682
1683 fn publish_module(
1684 &mut self,
1685 contract: Bytecode,
1686 service: Bytecode,
1687 vm_runtime: VmRuntime,
1688 formats: Option<Vec<u8>>,
1689 ) -> Result<ModuleId, ExecutionError> {
1690 let (blobs, module_id) = crate::runtime::create_bytecode_blobs_sync(
1691 &contract,
1692 &service,
1693 vm_runtime,
1694 formats.as_deref(),
1695 );
1696 let this = self.inner();
1697 for blob in blobs {
1698 this.execution_state_sender
1699 .send_request(|callback| ExecutionRequest::AddCreatedBlob { blob, callback })?
1700 .recv_response()?;
1701 }
1702 Ok(module_id)
1703 }
1704
1705 fn validation_round(&mut self) -> Result<Option<u32>, ExecutionError> {
1706 let this = self.inner();
1707 let round = this.round;
1708 this.execution_state_sender
1709 .send_request(|callback| ExecutionRequest::ValidationRound { round, callback })?
1710 .recv_response()
1711 }
1712
1713 fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
1714 let mut this = self.inner();
1715 let id = this.current_application().id;
1716 let state = this.view_user_states.entry(id).or_default();
1717 state.force_all_pending_queries()?;
1718 this.resource_controller.track_write_operations(
1719 batch
1720 .num_operations()
1721 .try_into()
1722 .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
1723 )?;
1724 this.resource_controller
1725 .track_bytes_written(batch.size() as u64)?;
1726 this.execution_state_sender
1727 .send_request(|callback| ExecutionRequest::WriteBatch {
1728 id,
1729 batch,
1730 callback,
1731 })?
1732 .recv_response()?;
1733 Ok(())
1734 }
1735}
1736
1737impl ServiceSyncRuntime {
1738 pub fn new(execution_state_sender: ExecutionStateSender, context: QueryContext) -> Self {
1740 Self::new_with_deadline(execution_state_sender, context, None)
1741 }
1742
1743 pub fn new_with_deadline(
1745 execution_state_sender: ExecutionStateSender,
1746 context: QueryContext,
1747 deadline: Option<Instant>,
1748 ) -> Self {
1749 let allow_application_logs = execution_state_sender
1751 .send_request(|callback| ExecutionRequest::AllowApplicationLogs { callback })
1752 .ok()
1753 .and_then(|receiver| receiver.recv_response().ok())
1754 .unwrap_or(false);
1755
1756 let runtime = SyncRuntime(Some(
1757 SyncRuntimeInternal::new(
1758 context.chain_id,
1759 context.next_block_height,
1760 None,
1761 None,
1762 execution_state_sender,
1763 deadline,
1764 None,
1765 ResourceController::default(),
1766 (),
1767 allow_application_logs,
1768 )
1769 .into(),
1770 ));
1771
1772 ServiceSyncRuntime {
1773 runtime,
1774 current_context: context,
1775 }
1776 }
1777
1778 pub(crate) fn preload_service(
1780 &self,
1781 id: ApplicationId,
1782 code: UserServiceCode,
1783 description: ApplicationDescription,
1784 ) {
1785 let this = self
1786 .runtime
1787 .0
1788 .as_ref()
1789 .expect("services shouldn't be preloaded while the runtime is being dropped");
1790 let mut this_guard = this.inner();
1791
1792 if let hash_map::Entry::Vacant(entry) = this_guard.preloaded_applications.entry(id) {
1793 entry.insert((code, description));
1794 }
1795 }
1796
1797 pub fn run(&mut self, incoming_requests: &std::sync::mpsc::Receiver<ServiceRuntimeRequest>) {
1799 while let Ok(request) = incoming_requests.recv() {
1800 let ServiceRuntimeRequest::Query {
1801 application_id,
1802 context,
1803 query,
1804 callback,
1805 } = request;
1806
1807 let result = self
1808 .prepare_for_query(context)
1809 .and_then(|()| self.run_query(application_id, query));
1810
1811 if let Err(err) = callback.send(result) {
1812 tracing::debug!(%err, "Receiver for query result has been dropped");
1813 }
1814 }
1815 }
1816
1817 pub(crate) fn prepare_for_query(
1819 &mut self,
1820 new_context: QueryContext,
1821 ) -> Result<(), ExecutionError> {
1822 let expected_context = QueryContext {
1823 local_time: new_context.local_time,
1824 ..self.current_context
1825 };
1826
1827 if new_context != expected_context {
1828 let execution_state_sender = self.handle_mut().inner().execution_state_sender.clone();
1829 *self = ServiceSyncRuntime::new(execution_state_sender, new_context);
1830 } else {
1831 self.handle_mut()
1832 .inner()
1833 .execution_state_sender
1834 .send_request(|callback| ExecutionRequest::SetLocalTime {
1835 local_time: new_context.local_time,
1836 callback,
1837 })?
1838 .recv_response()?;
1839 }
1840 Ok(())
1841 }
1842
1843 pub(crate) fn run_query(
1845 &mut self,
1846 application_id: ApplicationId,
1847 query: Vec<u8>,
1848 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
1849 let this = self.handle_mut();
1850 let response = this.try_query_application(application_id, query)?;
1851 let operations = mem::take(&mut this.inner().scheduled_operations);
1852
1853 Ok(QueryOutcome {
1854 response,
1855 operations,
1856 })
1857 }
1858
1859 fn handle_mut(&mut self) -> &mut ServiceSyncRuntimeHandle {
1861 self.runtime.0.as_mut().expect(
1862 "`SyncRuntimeHandle` should be available while `SyncRuntime` hasn't been dropped",
1863 )
1864 }
1865}
1866
1867impl ServiceRuntime for ServiceSyncRuntimeHandle {
1868 fn try_query_application(
1870 &mut self,
1871 queried_id: ApplicationId,
1872 argument: Vec<u8>,
1873 ) -> Result<Vec<u8>, ExecutionError> {
1874 let service = {
1875 let mut this = self.inner();
1876
1877 let application = this.load_service_instance(self.clone(), queried_id)?;
1879 this.push_application(ApplicationStatus {
1881 caller_id: None,
1882 id: queried_id,
1883 description: application.description,
1884 signer: None,
1885 });
1886 application.instance
1887 };
1888 let response = service
1889 .try_lock()
1890 .expect("Applications should not have reentrant calls")
1891 .handle_query(argument)?;
1892 self.inner().pop_application();
1893 Ok(response)
1894 }
1895
1896 fn schedule_operation(&mut self, operation: Vec<u8>) -> Result<(), ExecutionError> {
1897 let mut this = self.inner();
1898 let application_id = this.current_application().id;
1899
1900 this.scheduled_operations.push(Operation::User {
1901 application_id,
1902 bytes: operation,
1903 });
1904
1905 Ok(())
1906 }
1907
1908 fn check_execution_time(&mut self) -> Result<(), ExecutionError> {
1909 if let Some(deadline) = self.inner().deadline {
1910 if Instant::now() >= deadline {
1911 return Err(ExecutionError::MaximumServiceOracleExecutionTimeExceeded);
1912 }
1913 }
1914 Ok(())
1915 }
1916}
1917
1918pub enum ServiceRuntimeRequest {
1920 Query {
1921 application_id: ApplicationId,
1922 context: QueryContext,
1923 query: Vec<u8>,
1924 callback: oneshot::Sender<Result<QueryOutcome<Vec<u8>>, ExecutionError>>,
1925 },
1926}
1927
1928#[derive(Clone, Copy, Debug)]
1930struct ExecutingMessage {
1931 is_bouncing: bool,
1932 origin: ChainId,
1933 origin_timestamp: Timestamp,
1934}
1935
1936impl From<&MessageContext> for ExecutingMessage {
1937 fn from(context: &MessageContext) -> Self {
1938 ExecutingMessage {
1939 is_bouncing: context.is_bouncing,
1940 origin: context.origin,
1941 origin_timestamp: context.origin_timestamp,
1942 }
1943 }
1944}
1945
1946pub fn create_bytecode_blobs_sync(
1950 contract: &Bytecode,
1951 service: &Bytecode,
1952 vm_runtime: VmRuntime,
1953 formats: Option<&[u8]>,
1954) -> (Vec<Blob>, ModuleId) {
1955 let formats_blob = formats.map(Blob::new_application_formats);
1956 let formats_blob_hash = formats_blob.as_ref().map(|blob| blob.id().hash);
1957 let (mut blobs, module_id) = match vm_runtime {
1958 VmRuntime::Wasm => {
1959 let compressed_contract = contract.compress();
1960 let compressed_service = service.compress();
1961 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1962 let service_blob = Blob::new_service_bytecode(compressed_service);
1963 let module_id = ModuleId::new_with_formats(
1964 contract_blob.id().hash,
1965 service_blob.id().hash,
1966 vm_runtime,
1967 formats_blob_hash,
1968 );
1969 (vec![contract_blob, service_blob], module_id)
1970 }
1971 VmRuntime::Evm => {
1972 let compressed_contract = contract.compress();
1973 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1974 let module_id = ModuleId::new_with_formats(
1975 evm_contract_blob.id().hash,
1976 evm_contract_blob.id().hash,
1977 vm_runtime,
1978 formats_blob_hash,
1979 );
1980 (vec![evm_contract_blob], module_id)
1981 }
1982 };
1983 if let Some(blob) = formats_blob {
1984 blobs.push(blob);
1985 }
1986 (blobs, module_id)
1987}