linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    mem, vec,
7};
8
9use futures::{FutureExt, StreamExt};
10use linera_base::{
11    data_types::{Amount, BlockHeight, StreamUpdate},
12    identifiers::{Account, AccountOwner, StreamId},
13};
14use linera_views::{
15    context::Context,
16    key_value_store_view::KeyValueStoreView,
17    map_view::MapView,
18    reentrant_collection_view::HashedReentrantCollectionView,
19    views::{ClonableView, View},
20};
21use linera_views_derive::CryptoHashView;
22#[cfg(with_testing)]
23use {
24    crate::{
25        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
26    },
27    linera_base::data_types::Blob,
28    linera_views::context::MemoryContext,
29    std::sync::Arc,
30};
31
32use super::{runtime::ServiceRuntimeRequest, ExecutionRequest};
33use crate::{
34    resources::ResourceController, system::SystemExecutionStateView, ApplicationDescription,
35    ApplicationId, ContractSyncRuntime, ExecutionError, ExecutionRuntimeConfig,
36    ExecutionRuntimeContext, Message, MessageContext, MessageKind, Operation, OperationContext,
37    OutgoingMessage, ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime,
38    SystemMessage, Timestamp, TransactionTracker,
39};
40
41/// A view accessing the execution state of a chain.
42#[derive(Debug, ClonableView, CryptoHashView)]
43pub struct ExecutionStateView<C> {
44    /// System application.
45    pub system: SystemExecutionStateView<C>,
46    /// User applications.
47    pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
48    /// The number of events in the streams that this chain is writing to.
49    pub stream_event_counts: MapView<C, StreamId, u32>,
50}
51
52/// How to interact with a long-lived service runtime.
53pub struct ServiceRuntimeEndpoint {
54    /// How to receive requests.
55    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
56    /// How to query the runtime.
57    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
58}
59
60#[cfg(with_testing)]
61impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
62where
63    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
64{
65    /// Simulates the instantiation of an application.
66    pub async fn simulate_instantiation(
67        &mut self,
68        contract: UserContractCode,
69        local_time: linera_base::data_types::Timestamp,
70        application_description: ApplicationDescription,
71        instantiation_argument: Vec<u8>,
72        contract_blob: Blob,
73        service_blob: Blob,
74    ) -> Result<(), ExecutionError> {
75        let chain_id = application_description.creator_chain_id;
76        assert_eq!(chain_id, self.context().extra().chain_id);
77        let context = OperationContext {
78            chain_id,
79            authenticated_signer: None,
80            authenticated_caller_id: None,
81            height: application_description.block_height,
82            round: None,
83            timestamp: local_time,
84        };
85
86        let action = UserAction::Instantiate(context, instantiation_argument);
87        let next_message_index = 0;
88        let next_application_index = application_description.application_index + 1;
89        let next_chain_index = 0;
90
91        let application_id = From::from(&application_description);
92        let blob = Blob::new_application_description(&application_description);
93
94        self.system.used_blobs.insert(&blob.id())?;
95        self.system.used_blobs.insert(&contract_blob.id())?;
96        self.system.used_blobs.insert(&service_blob.id())?;
97
98        self.context()
99            .extra()
100            .user_contracts()
101            .insert(application_id, contract);
102
103        self.context()
104            .extra()
105            .add_blobs([
106                contract_blob,
107                service_blob,
108                Blob::new_application_description(&application_description),
109            ])
110            .await?;
111
112        let tracker = ResourceTracker::default();
113        let policy = ResourceControlPolicy::no_fees();
114        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
115        let mut txn_tracker = TransactionTracker::new(
116            local_time,
117            0,
118            next_message_index,
119            next_application_index,
120            next_chain_index,
121            None,
122        );
123        txn_tracker.add_created_blob(blob);
124        self.run_user_action(
125            application_id,
126            action,
127            context.refund_grant_to(),
128            None,
129            &mut txn_tracker,
130            &mut resource_controller,
131        )
132        .await?;
133
134        Ok(())
135    }
136}
137
138pub enum UserAction {
139    Instantiate(OperationContext, Vec<u8>),
140    Operation(OperationContext, Vec<u8>),
141    Message(MessageContext, Vec<u8>),
142    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
143}
144
145impl UserAction {
146    pub(crate) fn signer(&self) -> Option<AccountOwner> {
147        match self {
148            UserAction::Instantiate(context, _) => context.authenticated_signer,
149            UserAction::Operation(context, _) => context.authenticated_signer,
150            UserAction::ProcessStreams(_, _) => None,
151            UserAction::Message(context, _) => context.authenticated_signer,
152        }
153    }
154
155    pub(crate) fn height(&self) -> BlockHeight {
156        match self {
157            UserAction::Instantiate(context, _) => context.height,
158            UserAction::Operation(context, _) => context.height,
159            UserAction::ProcessStreams(context, _) => context.height,
160            UserAction::Message(context, _) => context.height,
161        }
162    }
163
164    pub(crate) fn round(&self) -> Option<u32> {
165        match self {
166            UserAction::Instantiate(context, _) => context.round,
167            UserAction::Operation(context, _) => context.round,
168            UserAction::ProcessStreams(context, _) => context.round,
169            UserAction::Message(context, _) => context.round,
170        }
171    }
172
173    pub(crate) fn timestamp(&self) -> Timestamp {
174        match self {
175            UserAction::Instantiate(context, _) => context.timestamp,
176            UserAction::Operation(context, _) => context.timestamp,
177            UserAction::ProcessStreams(context, _) => context.timestamp,
178            UserAction::Message(context, _) => context.timestamp,
179        }
180    }
181}
182
183impl<C> ExecutionStateView<C>
184where
185    C: Context + Clone + Send + Sync + 'static,
186    C::Extra: ExecutionRuntimeContext,
187{
188    async fn run_user_action(
189        &mut self,
190        application_id: ApplicationId,
191        action: UserAction,
192        refund_grant_to: Option<Account>,
193        grant: Option<&mut Amount>,
194        txn_tracker: &mut TransactionTracker,
195        resource_controller: &mut ResourceController<Option<AccountOwner>>,
196    ) -> Result<(), ExecutionError> {
197        let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
198        self.run_user_action_with_runtime(
199            application_id,
200            action,
201            refund_grant_to,
202            grant,
203            txn_tracker,
204            resource_controller,
205        )
206        .await
207    }
208
209    async fn run_user_action_with_runtime(
210        &mut self,
211        application_id: ApplicationId,
212        action: UserAction,
213        refund_grant_to: Option<Account>,
214        grant: Option<&mut Amount>,
215        txn_tracker: &mut TransactionTracker,
216        resource_controller: &mut ResourceController<Option<AccountOwner>>,
217    ) -> Result<(), ExecutionError> {
218        let chain_id = self.context().extra().chain_id();
219        let mut cloned_grant = grant.as_ref().map(|x| **x);
220        let initial_balance = resource_controller
221            .with_state_and_grant(&mut self.system, cloned_grant.as_mut())
222            .await?
223            .balance()?;
224        let controller = ResourceController::new(
225            resource_controller.policy().clone(),
226            resource_controller.tracker,
227            initial_balance,
228        );
229        let (execution_state_sender, mut execution_state_receiver) =
230            futures::channel::mpsc::unbounded();
231        let (code, description) = self.load_contract(application_id, txn_tracker).await?;
232        let txn_tracker_moved = mem::take(txn_tracker);
233        let contract_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
234            let runtime = ContractSyncRuntime::new(
235                execution_state_sender,
236                chain_id,
237                refund_grant_to,
238                controller,
239                &action,
240                txn_tracker_moved,
241            );
242
243            async move {
244                let code = codes.next().await.expect("we send this immediately below");
245                runtime.preload_contract(application_id, code, description)?;
246                runtime.run_action(application_id, chain_id, action)
247            }
248        })
249        .await;
250
251        contract_runtime_task.send(code)?;
252
253        while let Some(request) = execution_state_receiver.next().await {
254            self.handle_request(request, resource_controller).await?;
255        }
256
257        let (result, controller, txn_tracker_moved) = contract_runtime_task.join().await?;
258
259        *txn_tracker = txn_tracker_moved;
260        txn_tracker.add_operation_result(result);
261
262        resource_controller
263            .with_state_and_grant(&mut self.system, grant)
264            .await?
265            .merge_balance(initial_balance, controller.balance()?)?;
266        resource_controller.tracker = controller.tracker;
267
268        Ok(())
269    }
270
271    pub async fn execute_operation(
272        &mut self,
273        context: OperationContext,
274        operation: Operation,
275        txn_tracker: &mut TransactionTracker,
276        resource_controller: &mut ResourceController<Option<AccountOwner>>,
277    ) -> Result<(), ExecutionError> {
278        assert_eq!(context.chain_id, self.context().extra().chain_id());
279        match operation {
280            Operation::System(op) => {
281                let new_application = self
282                    .system
283                    .execute_operation(context, *op, txn_tracker, resource_controller)
284                    .await?;
285                if let Some((application_id, argument)) = new_application {
286                    let user_action = UserAction::Instantiate(context, argument);
287                    self.run_user_action(
288                        application_id,
289                        user_action,
290                        context.refund_grant_to(),
291                        None,
292                        txn_tracker,
293                        resource_controller,
294                    )
295                    .await?;
296                }
297            }
298            Operation::User {
299                application_id,
300                bytes,
301            } => {
302                self.run_user_action(
303                    application_id,
304                    UserAction::Operation(context, bytes),
305                    context.refund_grant_to(),
306                    None,
307                    txn_tracker,
308                    resource_controller,
309                )
310                .await?;
311            }
312        }
313        self.process_subscriptions(txn_tracker, resource_controller, context.into())
314            .await?;
315        Ok(())
316    }
317
318    pub async fn execute_message(
319        &mut self,
320        context: MessageContext,
321        message: Message,
322        grant: Option<&mut Amount>,
323        txn_tracker: &mut TransactionTracker,
324        resource_controller: &mut ResourceController<Option<AccountOwner>>,
325    ) -> Result<(), ExecutionError> {
326        assert_eq!(context.chain_id, self.context().extra().chain_id());
327        match message {
328            Message::System(message) => {
329                let outcome = self.system.execute_message(context, message).await?;
330                txn_tracker.add_outgoing_messages(outcome)?;
331            }
332            Message::User {
333                application_id,
334                bytes,
335            } => {
336                self.run_user_action(
337                    application_id,
338                    UserAction::Message(context, bytes),
339                    context.refund_grant_to,
340                    grant,
341                    txn_tracker,
342                    resource_controller,
343                )
344                .await?;
345            }
346        }
347        self.process_subscriptions(txn_tracker, resource_controller, context.into())
348            .await?;
349        Ok(())
350    }
351
352    pub async fn bounce_message(
353        &self,
354        context: MessageContext,
355        grant: Amount,
356        message: Message,
357        txn_tracker: &mut TransactionTracker,
358    ) -> Result<(), ExecutionError> {
359        assert_eq!(context.chain_id, self.context().extra().chain_id());
360        txn_tracker.add_outgoing_message(OutgoingMessage {
361            destination: context.message_id.chain_id,
362            authenticated_signer: context.authenticated_signer,
363            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
364            grant,
365            kind: MessageKind::Bouncing,
366            message,
367        })?;
368        Ok(())
369    }
370
371    pub async fn send_refund(
372        &self,
373        context: MessageContext,
374        amount: Amount,
375        txn_tracker: &mut TransactionTracker,
376    ) -> Result<(), ExecutionError> {
377        assert_eq!(context.chain_id, self.context().extra().chain_id());
378        if amount.is_zero() {
379            return Ok(());
380        }
381        let Some(account) = context.refund_grant_to else {
382            return Err(ExecutionError::InternalError(
383                "Messages with grants should have a non-empty `refund_grant_to`",
384            ));
385        };
386        let message = SystemMessage::Credit {
387            amount,
388            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
389            target: account.owner,
390        };
391        txn_tracker.add_outgoing_message(
392            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
393        )?;
394        Ok(())
395    }
396
397    pub async fn query_application(
398        &mut self,
399        context: QueryContext,
400        query: Query,
401        endpoint: Option<&mut ServiceRuntimeEndpoint>,
402    ) -> Result<QueryOutcome, ExecutionError> {
403        assert_eq!(context.chain_id, self.context().extra().chain_id());
404        match query {
405            Query::System(query) => {
406                let outcome = self.system.handle_query(context, query).await?;
407                Ok(outcome.into())
408            }
409            Query::User {
410                application_id,
411                bytes,
412            } => {
413                let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
414                let outcome = match endpoint {
415                    Some(endpoint) => {
416                        self.query_user_application_with_long_lived_service(
417                            application_id,
418                            context,
419                            bytes,
420                            &mut endpoint.incoming_execution_requests,
421                            &mut endpoint.runtime_request_sender,
422                        )
423                        .await?
424                    }
425                    None => {
426                        self.query_user_application(application_id, context, bytes)
427                            .await?
428                    }
429                };
430                Ok(outcome.into())
431            }
432        }
433    }
434
435    async fn query_user_application(
436        &mut self,
437        application_id: ApplicationId,
438        context: QueryContext,
439        query: Vec<u8>,
440    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
441        let (execution_state_sender, mut execution_state_receiver) =
442            futures::channel::mpsc::unbounded();
443        let (code, description) = self
444            .load_service(application_id, &mut TransactionTracker::default())
445            .await?;
446
447        let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
448            let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context);
449
450            async move {
451                let code = codes.next().await.expect("we send this immediately below");
452                runtime.preload_service(application_id, code, description)?;
453                runtime.run_query(application_id, query)
454            }
455        })
456        .await;
457
458        service_runtime_task.send(code)?;
459
460        while let Some(request) = execution_state_receiver.next().await {
461            self.handle_request(request, &mut ResourceController::default())
462                .await?;
463        }
464
465        service_runtime_task.join().await
466    }
467
468    async fn query_user_application_with_long_lived_service(
469        &mut self,
470        application_id: ApplicationId,
471        context: QueryContext,
472        query: Vec<u8>,
473        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
474            ExecutionRequest,
475        >,
476        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
477    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
478        let (outcome_sender, outcome_receiver) = oneshot::channel();
479        let mut outcome_receiver = outcome_receiver.fuse();
480
481        runtime_request_sender
482            .send(ServiceRuntimeRequest::Query {
483                application_id,
484                context,
485                query,
486                callback: outcome_sender,
487            })
488            .expect("Service runtime thread should only stop when `request_sender` is dropped");
489
490        loop {
491            futures::select! {
492                maybe_request = incoming_execution_requests.next() => {
493                    if let Some(request) = maybe_request {
494                        self.handle_request(request, &mut ResourceController::default()).await?;
495                    }
496                }
497                outcome = &mut outcome_receiver => {
498                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
499                }
500            }
501        }
502    }
503
504    pub async fn list_applications(
505        &self,
506    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
507        let mut applications = vec![];
508        for app_id in self.users.indices().await? {
509            let blob_id = app_id.description_blob_id();
510            let blob_content = self.system.read_blob_content(blob_id).await?;
511            let application_description = bcs::from_bytes(blob_content.bytes())?;
512            applications.push((app_id, application_description));
513        }
514        Ok(applications)
515    }
516
517    /// Calls `process_streams` for all applications that are subscribed to streams with new
518    /// events or that have new subscriptions.
519    async fn process_subscriptions(
520        &mut self,
521        txn_tracker: &mut TransactionTracker,
522        resource_controller: &mut ResourceController<Option<AccountOwner>>,
523        context: ProcessStreamsContext,
524    ) -> Result<(), ExecutionError> {
525        // Keep track of which streams we have already processed. This is to guard against
526        // applications unsubscribing and subscribing in the process_streams call itself.
527        let mut processed = BTreeSet::new();
528        loop {
529            let to_process = txn_tracker
530                .take_streams_to_process()
531                .into_iter()
532                .filter_map(|(app_id, updates)| {
533                    let updates = updates
534                        .into_iter()
535                        .filter_map(|update| {
536                            if !processed.insert((
537                                app_id,
538                                update.chain_id,
539                                update.stream_id.clone(),
540                            )) {
541                                return None;
542                            }
543                            Some(update)
544                        })
545                        .collect::<Vec<_>>();
546                    if updates.is_empty() {
547                        return None;
548                    }
549                    Some((app_id, updates))
550                })
551                .collect::<BTreeMap<_, _>>();
552            if to_process.is_empty() {
553                return Ok(());
554            }
555            for (app_id, updates) in to_process {
556                self.run_user_action(
557                    app_id,
558                    UserAction::ProcessStreams(context, updates),
559                    None,
560                    None,
561                    txn_tracker,
562                    resource_controller,
563                )
564                .await?;
565            }
566        }
567    }
568}