linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    ops::{Deref, DerefMut},
7    vec,
8};
9
10use allocative::Allocative;
11use futures::{FutureExt, StreamExt};
12use linera_base::{
13    crypto::{BcsHashable, CryptoHash},
14    data_types::{BlobContent, BlockHeight, StreamUpdate},
15    identifiers::{AccountOwner, BlobId, ChainId, StreamId},
16    time::Instant,
17};
18use linera_views::{
19    context::Context,
20    historical_hash_wrapper::HistoricallyHashableView,
21    key_value_store_view::KeyValueStoreView,
22    map_view::MapView,
23    reentrant_collection_view::ReentrantCollectionView,
24    views::{ClonableView, ReplaceContext, View},
25    ViewError,
26};
27use serde::{Deserialize, Serialize};
28#[cfg(with_testing)]
29use {
30    crate::{
31        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
32    },
33    linera_base::data_types::Blob,
34    linera_views::context::MemoryContext,
35    std::sync::Arc,
36};
37
38use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
39use crate::{
40    execution_state_actor::ExecutionStateActor, resources::ResourceController,
41    system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
42    ExecutionRuntimeConfig, ExecutionRuntimeContext, JsVec, MessageContext, OperationContext,
43    ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp,
44    TransactionTracker,
45};
46
47/// An inner view accessing the execution state of a chain, for hashing purposes.
48#[derive(Debug, ClonableView, View, Allocative)]
49#[allocative(bound = "C")]
50pub struct ExecutionStateViewInner<C> {
51    /// System application.
52    pub system: SystemExecutionStateView<C>,
53    /// User applications.
54    pub users: ReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
55    /// The heights of previous blocks that sent messages to the same recipients.
56    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
57    /// The heights of previous blocks that published events to the same streams.
58    pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
59}
60
61impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateViewInner<C> {
62    type Target = ExecutionStateViewInner<C2>;
63
64    async fn with_context(
65        &mut self,
66        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
67    ) -> Self::Target {
68        ExecutionStateViewInner {
69            system: self.system.with_context(ctx.clone()).await,
70            users: self.users.with_context(ctx.clone()).await,
71            previous_message_blocks: self.previous_message_blocks.with_context(ctx.clone()).await,
72            previous_event_blocks: self.previous_event_blocks.with_context(ctx.clone()).await,
73        }
74    }
75}
76
77/// A view accessing the execution state of a chain.
78#[derive(Debug, ClonableView, View, Allocative)]
79#[allocative(bound = "C")]
80pub struct ExecutionStateView<C> {
81    inner: HistoricallyHashableView<C, ExecutionStateViewInner<C>>,
82}
83
84impl<C> Deref for ExecutionStateView<C> {
85    type Target = ExecutionStateViewInner<C>;
86
87    fn deref(&self) -> &ExecutionStateViewInner<C> {
88        self.inner.deref()
89    }
90}
91
92impl<C> DerefMut for ExecutionStateView<C> {
93    fn deref_mut(&mut self) -> &mut ExecutionStateViewInner<C> {
94        self.inner.deref_mut()
95    }
96}
97
98impl<C> ExecutionStateView<C>
99where
100    C: Context + Clone + Send + Sync + 'static,
101    C::Extra: ExecutionRuntimeContext,
102{
103    pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
104        #[derive(Serialize, Deserialize)]
105        struct ExecutionStateViewHash([u8; 32]);
106        impl BcsHashable<'_> for ExecutionStateViewHash {}
107        let hash = self.inner.historical_hash().await?;
108        Ok(CryptoHash::new(&ExecutionStateViewHash(hash.into())))
109    }
110}
111
112impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
113    type Target = ExecutionStateView<C2>;
114
115    async fn with_context(
116        &mut self,
117        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
118    ) -> Self::Target {
119        ExecutionStateView {
120            inner: self.inner.with_context(ctx.clone()).await,
121        }
122    }
123}
124
125/// How to interact with a long-lived service runtime.
126pub struct ServiceRuntimeEndpoint {
127    /// How to receive requests.
128    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
129    /// How to query the runtime.
130    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
131}
132
133#[cfg(with_testing)]
134impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
135where
136    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
137{
138    /// Simulates the instantiation of an application.
139    pub async fn simulate_instantiation(
140        &mut self,
141        contract: UserContractCode,
142        local_time: linera_base::data_types::Timestamp,
143        application_description: ApplicationDescription,
144        instantiation_argument: Vec<u8>,
145        contract_blob: Blob,
146        service_blob: Blob,
147    ) -> Result<(), ExecutionError> {
148        let chain_id = application_description.creator_chain_id;
149        assert_eq!(chain_id, self.context().extra().chain_id);
150        let context = OperationContext {
151            chain_id,
152            authenticated_owner: None,
153            height: application_description.block_height,
154            round: None,
155            timestamp: local_time,
156        };
157
158        let action = UserAction::Instantiate(context, instantiation_argument);
159        let next_application_index = application_description.application_index + 1;
160        let next_chain_index = 0;
161
162        let application_id = From::from(&application_description);
163        let blob = Blob::new_application_description(&application_description);
164
165        self.system.used_blobs.insert(&blob.id())?;
166        self.system.used_blobs.insert(&contract_blob.id())?;
167        self.system.used_blobs.insert(&service_blob.id())?;
168
169        self.context()
170            .extra()
171            .user_contracts()
172            .pin()
173            .insert(application_id, contract);
174
175        self.context()
176            .extra()
177            .add_blobs([
178                contract_blob,
179                service_blob,
180                Blob::new_application_description(&application_description),
181            ])
182            .await?;
183
184        let tracker = ResourceTracker::default();
185        let policy = ResourceControlPolicy::no_fees();
186        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
187        let mut txn_tracker = TransactionTracker::new(
188            local_time,
189            0,
190            next_application_index,
191            next_chain_index,
192            None,
193            &[],
194        );
195        txn_tracker.add_created_blob(blob);
196        Box::pin(
197            ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
198                .run_user_action(application_id, action, context.refund_grant_to(), None),
199        )
200        .await?;
201
202        Ok(())
203    }
204}
205
206pub enum UserAction {
207    Instantiate(OperationContext, Vec<u8>),
208    Operation(OperationContext, Vec<u8>),
209    Message(MessageContext, Vec<u8>),
210    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
211}
212
213impl UserAction {
214    pub(crate) fn signer(&self) -> Option<AccountOwner> {
215        match self {
216            UserAction::Instantiate(context, _) => context.authenticated_owner,
217            UserAction::Operation(context, _) => context.authenticated_owner,
218            UserAction::ProcessStreams(_, _) => None,
219            UserAction::Message(context, _) => context.authenticated_owner,
220        }
221    }
222
223    pub(crate) fn height(&self) -> BlockHeight {
224        match self {
225            UserAction::Instantiate(context, _) => context.height,
226            UserAction::Operation(context, _) => context.height,
227            UserAction::ProcessStreams(context, _) => context.height,
228            UserAction::Message(context, _) => context.height,
229        }
230    }
231
232    pub(crate) fn round(&self) -> Option<u32> {
233        match self {
234            UserAction::Instantiate(context, _) => context.round,
235            UserAction::Operation(context, _) => context.round,
236            UserAction::ProcessStreams(context, _) => context.round,
237            UserAction::Message(context, _) => context.round,
238        }
239    }
240
241    pub(crate) fn timestamp(&self) -> Timestamp {
242        match self {
243            UserAction::Instantiate(context, _) => context.timestamp,
244            UserAction::Operation(context, _) => context.timestamp,
245            UserAction::ProcessStreams(context, _) => context.timestamp,
246            UserAction::Message(context, _) => context.timestamp,
247        }
248    }
249}
250
251impl<C> ExecutionStateView<C>
252where
253    C: Context + Clone + Send + Sync + 'static,
254    C::Extra: ExecutionRuntimeContext,
255{
256    pub async fn query_application(
257        &mut self,
258        context: QueryContext,
259        query: Query,
260        endpoint: Option<&mut ServiceRuntimeEndpoint>,
261    ) -> Result<QueryOutcome, ExecutionError> {
262        assert_eq!(context.chain_id, self.context().extra().chain_id());
263        match query {
264            Query::System(query) => {
265                let outcome = self.system.handle_query(context, query);
266                Ok(outcome.into())
267            }
268            Query::User {
269                application_id,
270                bytes,
271            } => {
272                let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
273                let outcome = match endpoint {
274                    Some(endpoint) => {
275                        self.query_user_application_with_long_lived_service(
276                            application_id,
277                            context,
278                            bytes,
279                            &mut endpoint.incoming_execution_requests,
280                            &mut endpoint.runtime_request_sender,
281                        )
282                        .await?
283                    }
284                    None => {
285                        self.query_user_application(application_id, context, bytes)
286                            .await?
287                    }
288                };
289                Ok(outcome.into())
290            }
291        }
292    }
293
294    async fn query_user_application(
295        &mut self,
296        application_id: ApplicationId,
297        context: QueryContext,
298        query: Vec<u8>,
299    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
300        self.query_user_application_with_deadline(
301            application_id,
302            context,
303            query,
304            None,
305            BTreeMap::new(),
306        )
307        .await
308    }
309
310    pub(crate) async fn query_user_application_with_deadline(
311        &mut self,
312        application_id: ApplicationId,
313        context: QueryContext,
314        query: Vec<u8>,
315        deadline: Option<Instant>,
316        created_blobs: BTreeMap<BlobId, BlobContent>,
317    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
318        let (execution_state_sender, mut execution_state_receiver) =
319            futures::channel::mpsc::unbounded();
320        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
321        let mut resource_controller = ResourceController::default();
322        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
323
324        let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
325
326        let thread = web_thread::Thread::new();
327        let service_runtime_task = thread.run_send(JsVec(codes), move |codes| async move {
328            let mut runtime =
329                ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
330
331            for (code, description) in codes.0.into_iter().zip(descriptions) {
332                runtime.preload_service(ApplicationId::from(&description), code, description)?;
333            }
334
335            runtime.run_query(application_id, query)
336        });
337
338        while let Some(request) = execution_state_receiver.next().await {
339            actor.handle_request(request).await?;
340        }
341
342        service_runtime_task.await?
343    }
344
345    async fn query_user_application_with_long_lived_service(
346        &mut self,
347        application_id: ApplicationId,
348        context: QueryContext,
349        query: Vec<u8>,
350        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
351            ExecutionRequest,
352        >,
353        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
354    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
355        let (outcome_sender, outcome_receiver) = oneshot::channel();
356        let mut outcome_receiver = outcome_receiver.fuse();
357
358        runtime_request_sender
359            .send(ServiceRuntimeRequest::Query {
360                application_id,
361                context,
362                query,
363                callback: outcome_sender,
364            })
365            .expect("Service runtime thread should only stop when `request_sender` is dropped");
366
367        let mut txn_tracker = TransactionTracker::default();
368        let mut resource_controller = ResourceController::default();
369        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
370
371        loop {
372            futures::select! {
373                maybe_request = incoming_execution_requests.next() => {
374                    if let Some(request) = maybe_request {
375                        actor.handle_request(request).await?;
376                    }
377                }
378                outcome = &mut outcome_receiver => {
379                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
380                }
381            }
382        }
383    }
384
385    pub async fn list_applications(
386        &self,
387    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
388        let mut applications = vec![];
389        for app_id in self.users.indices().await? {
390            let blob_id = app_id.description_blob_id();
391            let blob_content = self.system.read_blob_content(blob_id).await?;
392            let application_description = bcs::from_bytes(blob_content.bytes())?;
393            applications.push((app_id, application_description));
394        }
395        Ok(applications)
396    }
397}