linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8    data_types::{BlobContent, BlockHeight, StreamUpdate},
9    identifiers::{AccountOwner, BlobId},
10    time::Instant,
11};
12use linera_views::{
13    context::Context,
14    key_value_store_view::KeyValueStoreView,
15    reentrant_collection_view::HashedReentrantCollectionView,
16    views::{ClonableView, ReplaceContext, View},
17};
18use linera_views_derive::CryptoHashView;
19#[cfg(with_testing)]
20use {
21    crate::{
22        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
23    },
24    linera_base::data_types::Blob,
25    linera_views::context::MemoryContext,
26    std::sync::Arc,
27};
28
29use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
30use crate::{
31    execution_state_actor::ExecutionStateActor, resources::ResourceController,
32    system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
33    ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext, OperationContext,
34    ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp,
35    TransactionTracker,
36};
37
38/// A view accessing the execution state of a chain.
39#[derive(Debug, ClonableView, CryptoHashView)]
40pub struct ExecutionStateView<C> {
41    /// System application.
42    pub system: SystemExecutionStateView<C>,
43    /// User applications.
44    pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
45}
46
47impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
48    type Target = ExecutionStateView<C2>;
49
50    async fn with_context(
51        &mut self,
52        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
53    ) -> Self::Target {
54        ExecutionStateView {
55            system: self.system.with_context(ctx.clone()).await,
56            users: self.users.with_context(ctx.clone()).await,
57        }
58    }
59}
60
61/// How to interact with a long-lived service runtime.
62pub struct ServiceRuntimeEndpoint {
63    /// How to receive requests.
64    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
65    /// How to query the runtime.
66    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
67}
68
69#[cfg(with_testing)]
70impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
71where
72    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
73{
74    /// Simulates the instantiation of an application.
75    pub async fn simulate_instantiation(
76        &mut self,
77        contract: UserContractCode,
78        local_time: linera_base::data_types::Timestamp,
79        application_description: ApplicationDescription,
80        instantiation_argument: Vec<u8>,
81        contract_blob: Blob,
82        service_blob: Blob,
83    ) -> Result<(), ExecutionError> {
84        let chain_id = application_description.creator_chain_id;
85        assert_eq!(chain_id, self.context().extra().chain_id);
86        let context = OperationContext {
87            chain_id,
88            authenticated_owner: None,
89            height: application_description.block_height,
90            round: None,
91            timestamp: local_time,
92        };
93
94        let action = UserAction::Instantiate(context, instantiation_argument);
95        let next_application_index = application_description.application_index + 1;
96        let next_chain_index = 0;
97
98        let application_id = From::from(&application_description);
99        let blob = Blob::new_application_description(&application_description);
100
101        self.system.used_blobs.insert(&blob.id())?;
102        self.system.used_blobs.insert(&contract_blob.id())?;
103        self.system.used_blobs.insert(&service_blob.id())?;
104
105        self.context()
106            .extra()
107            .user_contracts()
108            .pin()
109            .insert(application_id, contract);
110
111        self.context()
112            .extra()
113            .add_blobs([
114                contract_blob,
115                service_blob,
116                Blob::new_application_description(&application_description),
117            ])
118            .await?;
119
120        let tracker = ResourceTracker::default();
121        let policy = ResourceControlPolicy::no_fees();
122        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
123        let mut txn_tracker = TransactionTracker::new(
124            local_time,
125            0,
126            next_application_index,
127            next_chain_index,
128            None,
129            &[],
130        );
131        txn_tracker.add_created_blob(blob);
132        ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
133            .run_user_action(application_id, action, context.refund_grant_to(), None)
134            .await?;
135
136        Ok(())
137    }
138}
139
140pub enum UserAction {
141    Instantiate(OperationContext, Vec<u8>),
142    Operation(OperationContext, Vec<u8>),
143    Message(MessageContext, Vec<u8>),
144    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
145}
146
147impl UserAction {
148    pub(crate) fn signer(&self) -> Option<AccountOwner> {
149        match self {
150            UserAction::Instantiate(context, _) => context.authenticated_owner,
151            UserAction::Operation(context, _) => context.authenticated_owner,
152            UserAction::ProcessStreams(_, _) => None,
153            UserAction::Message(context, _) => context.authenticated_owner,
154        }
155    }
156
157    pub(crate) fn height(&self) -> BlockHeight {
158        match self {
159            UserAction::Instantiate(context, _) => context.height,
160            UserAction::Operation(context, _) => context.height,
161            UserAction::ProcessStreams(context, _) => context.height,
162            UserAction::Message(context, _) => context.height,
163        }
164    }
165
166    pub(crate) fn round(&self) -> Option<u32> {
167        match self {
168            UserAction::Instantiate(context, _) => context.round,
169            UserAction::Operation(context, _) => context.round,
170            UserAction::ProcessStreams(context, _) => context.round,
171            UserAction::Message(context, _) => context.round,
172        }
173    }
174
175    pub(crate) fn timestamp(&self) -> Timestamp {
176        match self {
177            UserAction::Instantiate(context, _) => context.timestamp,
178            UserAction::Operation(context, _) => context.timestamp,
179            UserAction::ProcessStreams(context, _) => context.timestamp,
180            UserAction::Message(context, _) => context.timestamp,
181        }
182    }
183}
184
185impl<C> ExecutionStateView<C>
186where
187    C: Context + Clone + Send + Sync + 'static,
188    C::Extra: ExecutionRuntimeContext,
189{
190    pub async fn query_application(
191        &mut self,
192        context: QueryContext,
193        query: Query,
194        endpoint: Option<&mut ServiceRuntimeEndpoint>,
195    ) -> Result<QueryOutcome, ExecutionError> {
196        assert_eq!(context.chain_id, self.context().extra().chain_id());
197        match query {
198            Query::System(query) => {
199                let outcome = self.system.handle_query(context, query);
200                Ok(outcome.into())
201            }
202            Query::User {
203                application_id,
204                bytes,
205            } => {
206                let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
207                let outcome = match endpoint {
208                    Some(endpoint) => {
209                        self.query_user_application_with_long_lived_service(
210                            application_id,
211                            context,
212                            bytes,
213                            &mut endpoint.incoming_execution_requests,
214                            &mut endpoint.runtime_request_sender,
215                        )
216                        .await?
217                    }
218                    None => {
219                        self.query_user_application(application_id, context, bytes)
220                            .await?
221                    }
222                };
223                Ok(outcome.into())
224            }
225        }
226    }
227
228    async fn query_user_application(
229        &mut self,
230        application_id: ApplicationId,
231        context: QueryContext,
232        query: Vec<u8>,
233    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
234        self.query_user_application_with_deadline(
235            application_id,
236            context,
237            query,
238            None,
239            BTreeMap::new(),
240        )
241        .await
242    }
243
244    pub(crate) async fn query_user_application_with_deadline(
245        &mut self,
246        application_id: ApplicationId,
247        context: QueryContext,
248        query: Vec<u8>,
249        deadline: Option<Instant>,
250        created_blobs: BTreeMap<BlobId, BlobContent>,
251    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
252        let (execution_state_sender, mut execution_state_receiver) =
253            futures::channel::mpsc::unbounded();
254        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
255        let mut resource_controller = ResourceController::default();
256        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
257        let (code, description) = actor.load_service(application_id).await?;
258
259        let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
260            let mut runtime =
261                ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
262
263            async move {
264                let code = codes.next().await.expect("we send this immediately below");
265                runtime.preload_service(application_id, code, description)?;
266                runtime.run_query(application_id, query)
267            }
268        })
269        .await;
270
271        service_runtime_task.send(code)?;
272
273        while let Some(request) = execution_state_receiver.next().await {
274            actor.handle_request(request).await?;
275        }
276
277        service_runtime_task.join().await
278    }
279
280    async fn query_user_application_with_long_lived_service(
281        &mut self,
282        application_id: ApplicationId,
283        context: QueryContext,
284        query: Vec<u8>,
285        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
286            ExecutionRequest,
287        >,
288        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
289    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
290        let (outcome_sender, outcome_receiver) = oneshot::channel();
291        let mut outcome_receiver = outcome_receiver.fuse();
292
293        runtime_request_sender
294            .send(ServiceRuntimeRequest::Query {
295                application_id,
296                context,
297                query,
298                callback: outcome_sender,
299            })
300            .expect("Service runtime thread should only stop when `request_sender` is dropped");
301
302        let mut txn_tracker = TransactionTracker::default();
303        let mut resource_controller = ResourceController::default();
304        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
305
306        loop {
307            futures::select! {
308                maybe_request = incoming_execution_requests.next() => {
309                    if let Some(request) = maybe_request {
310                        actor.handle_request(request).await?;
311                    }
312                }
313                outcome = &mut outcome_receiver => {
314                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
315                }
316            }
317        }
318    }
319
320    pub async fn list_applications(
321        &self,
322    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
323        let mut applications = vec![];
324        for app_id in self.users.indices().await? {
325            let blob_id = app_id.description_blob_id();
326            let blob_content = self.system.read_blob_content(blob_id).await?;
327            let application_description = bcs::from_bytes(blob_content.bytes())?;
328            applications.push((app_id, application_description));
329        }
330        Ok(applications)
331    }
332}