linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8    data_types::{BlobContent, BlockHeight, StreamUpdate},
9    identifiers::{AccountOwner, BlobId, StreamId},
10    time::Instant,
11};
12use linera_views::{
13    context::Context,
14    key_value_store_view::KeyValueStoreView,
15    map_view::MapView,
16    reentrant_collection_view::HashedReentrantCollectionView,
17    views::{ClonableView, ReplaceContext, View},
18};
19use linera_views_derive::CryptoHashView;
20#[cfg(with_testing)]
21use {
22    crate::{
23        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
24    },
25    linera_base::data_types::Blob,
26    linera_views::context::MemoryContext,
27    std::sync::Arc,
28};
29
30use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
31use crate::{
32    execution_state_actor::ExecutionStateActor, resources::ResourceController,
33    system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
34    ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext, OperationContext,
35    ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp,
36    TransactionTracker,
37};
38
39/// A view accessing the execution state of a chain.
40#[derive(Debug, ClonableView, CryptoHashView)]
41pub struct ExecutionStateView<C> {
42    /// System application.
43    pub system: SystemExecutionStateView<C>,
44    /// User applications.
45    pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
46    /// The number of events in the streams that this chain is writing to.
47    pub stream_event_counts: MapView<C, StreamId, u32>,
48}
49
50impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
51    type Target = ExecutionStateView<C2>;
52
53    async fn with_context(
54        &mut self,
55        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
56    ) -> Self::Target {
57        ExecutionStateView {
58            system: self.system.with_context(ctx.clone()).await,
59            users: self.users.with_context(ctx.clone()).await,
60            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
61        }
62    }
63}
64
65/// How to interact with a long-lived service runtime.
66pub struct ServiceRuntimeEndpoint {
67    /// How to receive requests.
68    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
69    /// How to query the runtime.
70    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
71}
72
73#[cfg(with_testing)]
74impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
75where
76    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
77{
78    /// Simulates the instantiation of an application.
79    pub async fn simulate_instantiation(
80        &mut self,
81        contract: UserContractCode,
82        local_time: linera_base::data_types::Timestamp,
83        application_description: ApplicationDescription,
84        instantiation_argument: Vec<u8>,
85        contract_blob: Blob,
86        service_blob: Blob,
87    ) -> Result<(), ExecutionError> {
88        let chain_id = application_description.creator_chain_id;
89        assert_eq!(chain_id, self.context().extra().chain_id);
90        let context = OperationContext {
91            chain_id,
92            authenticated_signer: None,
93            height: application_description.block_height,
94            round: None,
95            timestamp: local_time,
96        };
97
98        let action = UserAction::Instantiate(context, instantiation_argument);
99        let next_application_index = application_description.application_index + 1;
100        let next_chain_index = 0;
101
102        let application_id = From::from(&application_description);
103        let blob = Blob::new_application_description(&application_description);
104
105        self.system.used_blobs.insert(&blob.id())?;
106        self.system.used_blobs.insert(&contract_blob.id())?;
107        self.system.used_blobs.insert(&service_blob.id())?;
108
109        self.context()
110            .extra()
111            .user_contracts()
112            .pin()
113            .insert(application_id, contract);
114
115        self.context()
116            .extra()
117            .add_blobs([
118                contract_blob,
119                service_blob,
120                Blob::new_application_description(&application_description),
121            ])
122            .await?;
123
124        let tracker = ResourceTracker::default();
125        let policy = ResourceControlPolicy::no_fees();
126        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
127        let mut txn_tracker = TransactionTracker::new(
128            local_time,
129            0,
130            next_application_index,
131            next_chain_index,
132            None,
133            &[],
134        );
135        txn_tracker.add_created_blob(blob);
136        ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
137            .run_user_action(application_id, action, context.refund_grant_to(), None)
138            .await?;
139
140        Ok(())
141    }
142}
143
144pub enum UserAction {
145    Instantiate(OperationContext, Vec<u8>),
146    Operation(OperationContext, Vec<u8>),
147    Message(MessageContext, Vec<u8>),
148    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
149}
150
151impl UserAction {
152    pub(crate) fn signer(&self) -> Option<AccountOwner> {
153        match self {
154            UserAction::Instantiate(context, _) => context.authenticated_signer,
155            UserAction::Operation(context, _) => context.authenticated_signer,
156            UserAction::ProcessStreams(_, _) => None,
157            UserAction::Message(context, _) => context.authenticated_signer,
158        }
159    }
160
161    pub(crate) fn height(&self) -> BlockHeight {
162        match self {
163            UserAction::Instantiate(context, _) => context.height,
164            UserAction::Operation(context, _) => context.height,
165            UserAction::ProcessStreams(context, _) => context.height,
166            UserAction::Message(context, _) => context.height,
167        }
168    }
169
170    pub(crate) fn round(&self) -> Option<u32> {
171        match self {
172            UserAction::Instantiate(context, _) => context.round,
173            UserAction::Operation(context, _) => context.round,
174            UserAction::ProcessStreams(context, _) => context.round,
175            UserAction::Message(context, _) => context.round,
176        }
177    }
178
179    pub(crate) fn timestamp(&self) -> Timestamp {
180        match self {
181            UserAction::Instantiate(context, _) => context.timestamp,
182            UserAction::Operation(context, _) => context.timestamp,
183            UserAction::ProcessStreams(context, _) => context.timestamp,
184            UserAction::Message(context, _) => context.timestamp,
185        }
186    }
187}
188
189impl<C> ExecutionStateView<C>
190where
191    C: Context + Clone + Send + Sync + 'static,
192    C::Extra: ExecutionRuntimeContext,
193{
194    pub async fn query_application(
195        &mut self,
196        context: QueryContext,
197        query: Query,
198        endpoint: Option<&mut ServiceRuntimeEndpoint>,
199    ) -> Result<QueryOutcome, ExecutionError> {
200        assert_eq!(context.chain_id, self.context().extra().chain_id());
201        match query {
202            Query::System(query) => {
203                let outcome = self.system.handle_query(context, query);
204                Ok(outcome.into())
205            }
206            Query::User {
207                application_id,
208                bytes,
209            } => {
210                let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
211                let outcome = match endpoint {
212                    Some(endpoint) => {
213                        self.query_user_application_with_long_lived_service(
214                            application_id,
215                            context,
216                            bytes,
217                            &mut endpoint.incoming_execution_requests,
218                            &mut endpoint.runtime_request_sender,
219                        )
220                        .await?
221                    }
222                    None => {
223                        self.query_user_application(application_id, context, bytes)
224                            .await?
225                    }
226                };
227                Ok(outcome.into())
228            }
229        }
230    }
231
232    async fn query_user_application(
233        &mut self,
234        application_id: ApplicationId,
235        context: QueryContext,
236        query: Vec<u8>,
237    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
238        self.query_user_application_with_deadline(
239            application_id,
240            context,
241            query,
242            None,
243            BTreeMap::new(),
244        )
245        .await
246    }
247
248    pub(crate) async fn query_user_application_with_deadline(
249        &mut self,
250        application_id: ApplicationId,
251        context: QueryContext,
252        query: Vec<u8>,
253        deadline: Option<Instant>,
254        created_blobs: BTreeMap<BlobId, BlobContent>,
255    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
256        let (execution_state_sender, mut execution_state_receiver) =
257            futures::channel::mpsc::unbounded();
258        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
259        let mut resource_controller = ResourceController::default();
260        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
261        let (code, description) = actor.load_service(application_id).await?;
262
263        let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
264            let mut runtime =
265                ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
266
267            async move {
268                let code = codes.next().await.expect("we send this immediately below");
269                runtime.preload_service(application_id, code, description)?;
270                runtime.run_query(application_id, query)
271            }
272        })
273        .await;
274
275        service_runtime_task.send(code)?;
276
277        while let Some(request) = execution_state_receiver.next().await {
278            actor.handle_request(request).await?;
279        }
280
281        service_runtime_task.join().await
282    }
283
284    async fn query_user_application_with_long_lived_service(
285        &mut self,
286        application_id: ApplicationId,
287        context: QueryContext,
288        query: Vec<u8>,
289        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
290            ExecutionRequest,
291        >,
292        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
293    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
294        let (outcome_sender, outcome_receiver) = oneshot::channel();
295        let mut outcome_receiver = outcome_receiver.fuse();
296
297        runtime_request_sender
298            .send(ServiceRuntimeRequest::Query {
299                application_id,
300                context,
301                query,
302                callback: outcome_sender,
303            })
304            .expect("Service runtime thread should only stop when `request_sender` is dropped");
305
306        let mut txn_tracker = TransactionTracker::default();
307        let mut resource_controller = ResourceController::default();
308        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
309
310        loop {
311            futures::select! {
312                maybe_request = incoming_execution_requests.next() => {
313                    if let Some(request) = maybe_request {
314                        actor.handle_request(request).await?;
315                    }
316                }
317                outcome = &mut outcome_receiver => {
318                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
319                }
320            }
321        }
322    }
323
324    pub async fn list_applications(
325        &self,
326    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
327        let mut applications = vec![];
328        for app_id in self.users.indices().await? {
329            let blob_id = app_id.description_blob_id();
330            let blob_content = self.system.read_blob_content(blob_id).await?;
331            let application_description = bcs::from_bytes(blob_content.bytes())?;
332            applications.push((app_id, application_description));
333        }
334        Ok(applications)
335    }
336}