1use std::{
5 collections::{BTreeMap, BTreeSet},
6 mem, vec,
7};
8
9use futures::{FutureExt, StreamExt};
10use linera_base::{
11 data_types::{Amount, BlockHeight, StreamUpdate},
12 identifiers::{Account, AccountOwner, StreamId},
13};
14use linera_views::{
15 context::Context,
16 key_value_store_view::KeyValueStoreView,
17 map_view::MapView,
18 reentrant_collection_view::HashedReentrantCollectionView,
19 views::{ClonableView, View},
20};
21use linera_views_derive::CryptoHashView;
22#[cfg(with_testing)]
23use {
24 crate::{
25 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
26 },
27 linera_base::data_types::Blob,
28 linera_views::context::MemoryContext,
29 std::sync::Arc,
30};
31
32use super::{runtime::ServiceRuntimeRequest, ExecutionRequest};
33use crate::{
34 resources::ResourceController, system::SystemExecutionStateView, ApplicationDescription,
35 ApplicationId, ContractSyncRuntime, ExecutionError, ExecutionRuntimeConfig,
36 ExecutionRuntimeContext, Message, MessageContext, MessageKind, Operation, OperationContext,
37 OutgoingMessage, ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime,
38 SystemMessage, Timestamp, TransactionTracker,
39};
40
41#[derive(Debug, ClonableView, CryptoHashView)]
43pub struct ExecutionStateView<C> {
44 pub system: SystemExecutionStateView<C>,
46 pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
48 pub stream_event_counts: MapView<C, StreamId, u32>,
50}
51
52pub struct ServiceRuntimeEndpoint {
54 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
56 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
58}
59
60#[cfg(with_testing)]
61impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
62where
63 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
64{
65 pub async fn simulate_instantiation(
67 &mut self,
68 contract: UserContractCode,
69 local_time: linera_base::data_types::Timestamp,
70 application_description: ApplicationDescription,
71 instantiation_argument: Vec<u8>,
72 contract_blob: Blob,
73 service_blob: Blob,
74 ) -> Result<(), ExecutionError> {
75 let chain_id = application_description.creator_chain_id;
76 assert_eq!(chain_id, self.context().extra().chain_id);
77 let context = OperationContext {
78 chain_id,
79 authenticated_signer: None,
80 authenticated_caller_id: None,
81 height: application_description.block_height,
82 round: None,
83 timestamp: local_time,
84 };
85
86 let action = UserAction::Instantiate(context, instantiation_argument);
87 let next_message_index = 0;
88 let next_application_index = application_description.application_index + 1;
89 let next_chain_index = 0;
90
91 let application_id = From::from(&application_description);
92 let blob = Blob::new_application_description(&application_description);
93
94 self.system.used_blobs.insert(&blob.id())?;
95 self.system.used_blobs.insert(&contract_blob.id())?;
96 self.system.used_blobs.insert(&service_blob.id())?;
97
98 self.context()
99 .extra()
100 .user_contracts()
101 .insert(application_id, contract);
102
103 self.context()
104 .extra()
105 .add_blobs([
106 contract_blob,
107 service_blob,
108 Blob::new_application_description(&application_description),
109 ])
110 .await?;
111
112 let tracker = ResourceTracker::default();
113 let policy = ResourceControlPolicy::no_fees();
114 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
115 let mut txn_tracker = TransactionTracker::new(
116 local_time,
117 0,
118 next_message_index,
119 next_application_index,
120 next_chain_index,
121 None,
122 );
123 txn_tracker.add_created_blob(blob);
124 self.run_user_action(
125 application_id,
126 action,
127 context.refund_grant_to(),
128 None,
129 &mut txn_tracker,
130 &mut resource_controller,
131 )
132 .await?;
133
134 Ok(())
135 }
136}
137
138pub enum UserAction {
139 Instantiate(OperationContext, Vec<u8>),
140 Operation(OperationContext, Vec<u8>),
141 Message(MessageContext, Vec<u8>),
142 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
143}
144
145impl UserAction {
146 pub(crate) fn signer(&self) -> Option<AccountOwner> {
147 match self {
148 UserAction::Instantiate(context, _) => context.authenticated_signer,
149 UserAction::Operation(context, _) => context.authenticated_signer,
150 UserAction::ProcessStreams(_, _) => None,
151 UserAction::Message(context, _) => context.authenticated_signer,
152 }
153 }
154
155 pub(crate) fn height(&self) -> BlockHeight {
156 match self {
157 UserAction::Instantiate(context, _) => context.height,
158 UserAction::Operation(context, _) => context.height,
159 UserAction::ProcessStreams(context, _) => context.height,
160 UserAction::Message(context, _) => context.height,
161 }
162 }
163
164 pub(crate) fn round(&self) -> Option<u32> {
165 match self {
166 UserAction::Instantiate(context, _) => context.round,
167 UserAction::Operation(context, _) => context.round,
168 UserAction::ProcessStreams(context, _) => context.round,
169 UserAction::Message(context, _) => context.round,
170 }
171 }
172
173 pub(crate) fn timestamp(&self) -> Timestamp {
174 match self {
175 UserAction::Instantiate(context, _) => context.timestamp,
176 UserAction::Operation(context, _) => context.timestamp,
177 UserAction::ProcessStreams(context, _) => context.timestamp,
178 UserAction::Message(context, _) => context.timestamp,
179 }
180 }
181}
182
183impl<C> ExecutionStateView<C>
184where
185 C: Context + Clone + Send + Sync + 'static,
186 C::Extra: ExecutionRuntimeContext,
187{
188 async fn run_user_action(
189 &mut self,
190 application_id: ApplicationId,
191 action: UserAction,
192 refund_grant_to: Option<Account>,
193 grant: Option<&mut Amount>,
194 txn_tracker: &mut TransactionTracker,
195 resource_controller: &mut ResourceController<Option<AccountOwner>>,
196 ) -> Result<(), ExecutionError> {
197 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
198 self.run_user_action_with_runtime(
199 application_id,
200 action,
201 refund_grant_to,
202 grant,
203 txn_tracker,
204 resource_controller,
205 )
206 .await
207 }
208
209 async fn run_user_action_with_runtime(
210 &mut self,
211 application_id: ApplicationId,
212 action: UserAction,
213 refund_grant_to: Option<Account>,
214 grant: Option<&mut Amount>,
215 txn_tracker: &mut TransactionTracker,
216 resource_controller: &mut ResourceController<Option<AccountOwner>>,
217 ) -> Result<(), ExecutionError> {
218 let chain_id = self.context().extra().chain_id();
219 let mut cloned_grant = grant.as_ref().map(|x| **x);
220 let initial_balance = resource_controller
221 .with_state_and_grant(&mut self.system, cloned_grant.as_mut())
222 .await?
223 .balance()?;
224 let controller = ResourceController::new(
225 resource_controller.policy().clone(),
226 resource_controller.tracker,
227 initial_balance,
228 );
229 let (execution_state_sender, mut execution_state_receiver) =
230 futures::channel::mpsc::unbounded();
231 let (code, description) = self.load_contract(application_id, txn_tracker).await?;
232 let txn_tracker_moved = mem::take(txn_tracker);
233 let contract_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
234 let runtime = ContractSyncRuntime::new(
235 execution_state_sender,
236 chain_id,
237 refund_grant_to,
238 controller,
239 &action,
240 txn_tracker_moved,
241 );
242
243 async move {
244 let code = codes.next().await.expect("we send this immediately below");
245 runtime.preload_contract(application_id, code, description)?;
246 runtime.run_action(application_id, chain_id, action)
247 }
248 })
249 .await;
250
251 contract_runtime_task.send(code)?;
252
253 while let Some(request) = execution_state_receiver.next().await {
254 self.handle_request(request, resource_controller).await?;
255 }
256
257 let (result, controller, txn_tracker_moved) = contract_runtime_task.join().await?;
258
259 *txn_tracker = txn_tracker_moved;
260 txn_tracker.add_operation_result(result);
261
262 resource_controller
263 .with_state_and_grant(&mut self.system, grant)
264 .await?
265 .merge_balance(initial_balance, controller.balance()?)?;
266 resource_controller.tracker = controller.tracker;
267
268 Ok(())
269 }
270
271 pub async fn execute_operation(
272 &mut self,
273 context: OperationContext,
274 operation: Operation,
275 txn_tracker: &mut TransactionTracker,
276 resource_controller: &mut ResourceController<Option<AccountOwner>>,
277 ) -> Result<(), ExecutionError> {
278 assert_eq!(context.chain_id, self.context().extra().chain_id());
279 match operation {
280 Operation::System(op) => {
281 let new_application = self
282 .system
283 .execute_operation(context, *op, txn_tracker, resource_controller)
284 .await?;
285 if let Some((application_id, argument)) = new_application {
286 let user_action = UserAction::Instantiate(context, argument);
287 self.run_user_action(
288 application_id,
289 user_action,
290 context.refund_grant_to(),
291 None,
292 txn_tracker,
293 resource_controller,
294 )
295 .await?;
296 }
297 }
298 Operation::User {
299 application_id,
300 bytes,
301 } => {
302 self.run_user_action(
303 application_id,
304 UserAction::Operation(context, bytes),
305 context.refund_grant_to(),
306 None,
307 txn_tracker,
308 resource_controller,
309 )
310 .await?;
311 }
312 }
313 self.process_subscriptions(txn_tracker, resource_controller, context.into())
314 .await?;
315 Ok(())
316 }
317
318 pub async fn execute_message(
319 &mut self,
320 context: MessageContext,
321 message: Message,
322 grant: Option<&mut Amount>,
323 txn_tracker: &mut TransactionTracker,
324 resource_controller: &mut ResourceController<Option<AccountOwner>>,
325 ) -> Result<(), ExecutionError> {
326 assert_eq!(context.chain_id, self.context().extra().chain_id());
327 match message {
328 Message::System(message) => {
329 let outcome = self.system.execute_message(context, message).await?;
330 txn_tracker.add_outgoing_messages(outcome)?;
331 }
332 Message::User {
333 application_id,
334 bytes,
335 } => {
336 self.run_user_action(
337 application_id,
338 UserAction::Message(context, bytes),
339 context.refund_grant_to,
340 grant,
341 txn_tracker,
342 resource_controller,
343 )
344 .await?;
345 }
346 }
347 self.process_subscriptions(txn_tracker, resource_controller, context.into())
348 .await?;
349 Ok(())
350 }
351
352 pub async fn bounce_message(
353 &self,
354 context: MessageContext,
355 grant: Amount,
356 message: Message,
357 txn_tracker: &mut TransactionTracker,
358 ) -> Result<(), ExecutionError> {
359 assert_eq!(context.chain_id, self.context().extra().chain_id());
360 txn_tracker.add_outgoing_message(OutgoingMessage {
361 destination: context.message_id.chain_id,
362 authenticated_signer: context.authenticated_signer,
363 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
364 grant,
365 kind: MessageKind::Bouncing,
366 message,
367 })?;
368 Ok(())
369 }
370
371 pub async fn send_refund(
372 &self,
373 context: MessageContext,
374 amount: Amount,
375 txn_tracker: &mut TransactionTracker,
376 ) -> Result<(), ExecutionError> {
377 assert_eq!(context.chain_id, self.context().extra().chain_id());
378 if amount.is_zero() {
379 return Ok(());
380 }
381 let Some(account) = context.refund_grant_to else {
382 return Err(ExecutionError::InternalError(
383 "Messages with grants should have a non-empty `refund_grant_to`",
384 ));
385 };
386 let message = SystemMessage::Credit {
387 amount,
388 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
389 target: account.owner,
390 };
391 txn_tracker.add_outgoing_message(
392 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
393 )?;
394 Ok(())
395 }
396
397 pub async fn query_application(
398 &mut self,
399 context: QueryContext,
400 query: Query,
401 endpoint: Option<&mut ServiceRuntimeEndpoint>,
402 ) -> Result<QueryOutcome, ExecutionError> {
403 assert_eq!(context.chain_id, self.context().extra().chain_id());
404 match query {
405 Query::System(query) => {
406 let outcome = self.system.handle_query(context, query).await?;
407 Ok(outcome.into())
408 }
409 Query::User {
410 application_id,
411 bytes,
412 } => {
413 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
414 let outcome = match endpoint {
415 Some(endpoint) => {
416 self.query_user_application_with_long_lived_service(
417 application_id,
418 context,
419 bytes,
420 &mut endpoint.incoming_execution_requests,
421 &mut endpoint.runtime_request_sender,
422 )
423 .await?
424 }
425 None => {
426 self.query_user_application(application_id, context, bytes)
427 .await?
428 }
429 };
430 Ok(outcome.into())
431 }
432 }
433 }
434
435 async fn query_user_application(
436 &mut self,
437 application_id: ApplicationId,
438 context: QueryContext,
439 query: Vec<u8>,
440 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
441 let (execution_state_sender, mut execution_state_receiver) =
442 futures::channel::mpsc::unbounded();
443 let (code, description) = self
444 .load_service(application_id, &mut TransactionTracker::default())
445 .await?;
446
447 let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
448 let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context);
449
450 async move {
451 let code = codes.next().await.expect("we send this immediately below");
452 runtime.preload_service(application_id, code, description)?;
453 runtime.run_query(application_id, query)
454 }
455 })
456 .await;
457
458 service_runtime_task.send(code)?;
459
460 while let Some(request) = execution_state_receiver.next().await {
461 self.handle_request(request, &mut ResourceController::default())
462 .await?;
463 }
464
465 service_runtime_task.join().await
466 }
467
468 async fn query_user_application_with_long_lived_service(
469 &mut self,
470 application_id: ApplicationId,
471 context: QueryContext,
472 query: Vec<u8>,
473 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
474 ExecutionRequest,
475 >,
476 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
477 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
478 let (outcome_sender, outcome_receiver) = oneshot::channel();
479 let mut outcome_receiver = outcome_receiver.fuse();
480
481 runtime_request_sender
482 .send(ServiceRuntimeRequest::Query {
483 application_id,
484 context,
485 query,
486 callback: outcome_sender,
487 })
488 .expect("Service runtime thread should only stop when `request_sender` is dropped");
489
490 loop {
491 futures::select! {
492 maybe_request = incoming_execution_requests.next() => {
493 if let Some(request) = maybe_request {
494 self.handle_request(request, &mut ResourceController::default()).await?;
495 }
496 }
497 outcome = &mut outcome_receiver => {
498 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
499 }
500 }
501 }
502 }
503
504 pub async fn list_applications(
505 &self,
506 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
507 let mut applications = vec![];
508 for app_id in self.users.indices().await? {
509 let blob_id = app_id.description_blob_id();
510 let blob_content = self.system.read_blob_content(blob_id).await?;
511 let application_description = bcs::from_bytes(blob_content.bytes())?;
512 applications.push((app_id, application_description));
513 }
514 Ok(applications)
515 }
516
517 async fn process_subscriptions(
520 &mut self,
521 txn_tracker: &mut TransactionTracker,
522 resource_controller: &mut ResourceController<Option<AccountOwner>>,
523 context: ProcessStreamsContext,
524 ) -> Result<(), ExecutionError> {
525 let mut processed = BTreeSet::new();
528 loop {
529 let to_process = txn_tracker
530 .take_streams_to_process()
531 .into_iter()
532 .filter_map(|(app_id, updates)| {
533 let updates = updates
534 .into_iter()
535 .filter_map(|update| {
536 if !processed.insert((
537 app_id,
538 update.chain_id,
539 update.stream_id.clone(),
540 )) {
541 return None;
542 }
543 Some(update)
544 })
545 .collect::<Vec<_>>();
546 if updates.is_empty() {
547 return None;
548 }
549 Some((app_id, updates))
550 })
551 .collect::<BTreeMap<_, _>>();
552 if to_process.is_empty() {
553 return Ok(());
554 }
555 for (app_id, updates) in to_process {
556 self.run_user_action(
557 app_id,
558 UserAction::ProcessStreams(context, updates),
559 None,
560 None,
561 txn_tracker,
562 resource_controller,
563 )
564 .await?;
565 }
566 }
567 }
568}