linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    ops::{Deref, DerefMut},
7    vec,
8};
9
10use allocative::Allocative;
11use futures::{FutureExt, StreamExt};
12use linera_base::{
13    crypto::{BcsHashable, CryptoHash},
14    data_types::{BlobContent, BlockHeight, StreamUpdate},
15    identifiers::{AccountOwner, BlobId, ChainId, StreamId},
16    time::Instant,
17};
18use linera_views::{
19    context::Context,
20    historical_hash_wrapper::HistoricallyHashableView,
21    key_value_store_view::KeyValueStoreView,
22    map_view::MapView,
23    reentrant_collection_view::ReentrantCollectionView,
24    views::{ClonableView, ReplaceContext, View},
25    ViewError,
26};
27use serde::{Deserialize, Serialize};
28#[cfg(with_testing)]
29use {
30    crate::{
31        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
32    },
33    linera_base::data_types::Blob,
34    linera_views::context::MemoryContext,
35    std::sync::Arc,
36};
37
38use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
39use crate::{
40    execution_state_actor::ExecutionStateActor, resources::ResourceController,
41    system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
42    ExecutionRuntimeContext, JsVec, MessageContext, OperationContext, ProcessStreamsContext, Query,
43    QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp, TransactionTracker,
44};
45
46/// An inner view accessing the execution state of a chain, for hashing purposes.
47#[derive(Debug, ClonableView, View, Allocative)]
48#[allocative(bound = "C")]
49pub struct ExecutionStateViewInner<C> {
50    /// System application.
51    pub system: SystemExecutionStateView<C>,
52    /// User applications.
53    pub users: ReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
54    /// The heights of previous blocks that sent messages to the same recipients.
55    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
56    /// The heights of previous blocks that published events to the same streams.
57    pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
58}
59
60impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateViewInner<C> {
61    type Target = ExecutionStateViewInner<C2>;
62
63    async fn with_context(
64        &mut self,
65        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
66    ) -> Self::Target {
67        ExecutionStateViewInner {
68            system: self.system.with_context(ctx.clone()).await,
69            users: self.users.with_context(ctx.clone()).await,
70            previous_message_blocks: self.previous_message_blocks.with_context(ctx.clone()).await,
71            previous_event_blocks: self.previous_event_blocks.with_context(ctx.clone()).await,
72        }
73    }
74}
75
76/// A view accessing the execution state of a chain.
77#[derive(Debug, ClonableView, View, Allocative)]
78#[allocative(bound = "C")]
79pub struct ExecutionStateView<C> {
80    inner: HistoricallyHashableView<C, ExecutionStateViewInner<C>>,
81}
82
83impl<C> Deref for ExecutionStateView<C> {
84    type Target = ExecutionStateViewInner<C>;
85
86    fn deref(&self) -> &ExecutionStateViewInner<C> {
87        self.inner.deref()
88    }
89}
90
91impl<C> DerefMut for ExecutionStateView<C> {
92    fn deref_mut(&mut self) -> &mut ExecutionStateViewInner<C> {
93        self.inner.deref_mut()
94    }
95}
96
97impl<C> ExecutionStateView<C>
98where
99    C: Context + Clone + 'static,
100    C::Extra: ExecutionRuntimeContext,
101{
102    pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
103        #[derive(Serialize, Deserialize)]
104        struct ExecutionStateViewHash([u8; 32]);
105        impl BcsHashable<'_> for ExecutionStateViewHash {}
106        let hash = self.inner.historical_hash().await?;
107        Ok(CryptoHash::new(&ExecutionStateViewHash(hash.into())))
108    }
109}
110
111impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
112    type Target = ExecutionStateView<C2>;
113
114    async fn with_context(
115        &mut self,
116        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
117    ) -> Self::Target {
118        ExecutionStateView {
119            inner: self.inner.with_context(ctx.clone()).await,
120        }
121    }
122}
123
124/// How to interact with a long-lived service runtime.
125pub struct ServiceRuntimeEndpoint {
126    /// How to receive requests.
127    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
128    /// How to query the runtime.
129    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
130}
131
132#[cfg(with_testing)]
133impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
134where
135    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + 'static,
136{
137    /// Simulates the instantiation of an application.
138    pub async fn simulate_instantiation(
139        &mut self,
140        contract: UserContractCode,
141        local_time: linera_base::data_types::Timestamp,
142        application_description: ApplicationDescription,
143        instantiation_argument: Vec<u8>,
144        contract_blob: Blob,
145        service_blob: Blob,
146    ) -> Result<(), ExecutionError> {
147        let chain_id = application_description.creator_chain_id;
148        assert_eq!(chain_id, self.context().extra().chain_id);
149        let context = OperationContext {
150            chain_id,
151            authenticated_owner: None,
152            height: application_description.block_height,
153            round: None,
154            timestamp: local_time,
155        };
156
157        let action = UserAction::Instantiate(context, instantiation_argument);
158        let next_application_index = application_description.application_index + 1;
159        let next_chain_index = 0;
160
161        let application_id = From::from(&application_description);
162        let blob = Blob::new_application_description(&application_description);
163
164        self.system.used_blobs.insert(&blob.id())?;
165        self.system.used_blobs.insert(&contract_blob.id())?;
166        self.system.used_blobs.insert(&service_blob.id())?;
167
168        self.context()
169            .extra()
170            .user_contracts()
171            .pin()
172            .insert(application_id, contract);
173
174        self.context()
175            .extra()
176            .add_blobs([
177                contract_blob,
178                service_blob,
179                Blob::new_application_description(&application_description),
180            ])
181            .await?;
182
183        let tracker = ResourceTracker::default();
184        let policy = ResourceControlPolicy::no_fees();
185        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
186        let mut txn_tracker = TransactionTracker::new(
187            local_time,
188            0,
189            next_application_index,
190            next_chain_index,
191            None,
192            &[],
193        );
194        txn_tracker.add_created_blob(blob);
195        Box::pin(
196            ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
197                .run_user_action(application_id, action, context.refund_grant_to(), None),
198        )
199        .await?;
200
201        Ok(())
202    }
203}
204
205pub enum UserAction {
206    Instantiate(OperationContext, Vec<u8>),
207    Operation(OperationContext, Vec<u8>),
208    Message(MessageContext, Vec<u8>),
209    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
210}
211
212impl UserAction {
213    pub(crate) fn signer(&self) -> Option<AccountOwner> {
214        match self {
215            UserAction::Instantiate(context, _) => context.authenticated_owner,
216            UserAction::Operation(context, _) => context.authenticated_owner,
217            UserAction::ProcessStreams(_, _) => None,
218            UserAction::Message(context, _) => context.authenticated_owner,
219        }
220    }
221
222    pub(crate) fn height(&self) -> BlockHeight {
223        match self {
224            UserAction::Instantiate(context, _) => context.height,
225            UserAction::Operation(context, _) => context.height,
226            UserAction::ProcessStreams(context, _) => context.height,
227            UserAction::Message(context, _) => context.height,
228        }
229    }
230
231    pub(crate) fn round(&self) -> Option<u32> {
232        match self {
233            UserAction::Instantiate(context, _) => context.round,
234            UserAction::Operation(context, _) => context.round,
235            UserAction::ProcessStreams(context, _) => context.round,
236            UserAction::Message(context, _) => context.round,
237        }
238    }
239
240    pub(crate) fn timestamp(&self) -> Timestamp {
241        match self {
242            UserAction::Instantiate(context, _) => context.timestamp,
243            UserAction::Operation(context, _) => context.timestamp,
244            UserAction::ProcessStreams(context, _) => context.timestamp,
245            UserAction::Message(context, _) => context.timestamp,
246        }
247    }
248}
249
250impl<C> ExecutionStateView<C>
251where
252    C: Context + Clone + 'static,
253    C::Extra: ExecutionRuntimeContext,
254{
255    pub async fn query_application(
256        &mut self,
257        context: QueryContext,
258        query: Query,
259        endpoint: Option<&mut ServiceRuntimeEndpoint>,
260    ) -> Result<QueryOutcome, ExecutionError> {
261        assert_eq!(context.chain_id, self.context().extra().chain_id());
262        match query {
263            Query::System(query) => {
264                let outcome = self.system.handle_query(context, query);
265                Ok(outcome.into())
266            }
267            Query::User {
268                application_id,
269                bytes,
270            } => {
271                let outcome = match endpoint {
272                    Some(endpoint) => {
273                        self.query_user_application_with_long_lived_service(
274                            application_id,
275                            context,
276                            bytes,
277                            &mut endpoint.incoming_execution_requests,
278                            &mut endpoint.runtime_request_sender,
279                        )
280                        .await?
281                    }
282                    None => {
283                        self.query_user_application(application_id, context, bytes)
284                            .await?
285                    }
286                };
287                Ok(outcome.into())
288            }
289        }
290    }
291
292    async fn query_user_application(
293        &mut self,
294        application_id: ApplicationId,
295        context: QueryContext,
296        query: Vec<u8>,
297    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
298        self.query_user_application_with_deadline(
299            application_id,
300            context,
301            query,
302            None,
303            BTreeMap::new(),
304        )
305        .await
306    }
307
308    pub(crate) async fn query_user_application_with_deadline(
309        &mut self,
310        application_id: ApplicationId,
311        context: QueryContext,
312        query: Vec<u8>,
313        deadline: Option<Instant>,
314        created_blobs: BTreeMap<BlobId, BlobContent>,
315    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
316        let (execution_state_sender, mut execution_state_receiver) =
317            futures::channel::mpsc::unbounded();
318        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
319        let mut resource_controller = ResourceController::default();
320        let thread_pool = self.context().extra().thread_pool().clone();
321        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
322
323        let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
324
325        let service_runtime_task = thread_pool
326            .run_send(JsVec(codes), move |codes| async move {
327                let mut runtime = ServiceSyncRuntime::new_with_deadline(
328                    execution_state_sender,
329                    context,
330                    deadline,
331                );
332
333                for (code, description) in codes.0.into_iter().zip(descriptions) {
334                    runtime.preload_service(
335                        ApplicationId::from(&description),
336                        code,
337                        description,
338                    )?;
339                }
340
341                runtime.run_query(application_id, query)
342            })
343            .await;
344
345        while let Some(request) = execution_state_receiver.next().await {
346            actor.handle_request(request).await?;
347        }
348
349        service_runtime_task.await?
350    }
351
352    async fn query_user_application_with_long_lived_service(
353        &mut self,
354        application_id: ApplicationId,
355        context: QueryContext,
356        query: Vec<u8>,
357        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
358            ExecutionRequest,
359        >,
360        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
361    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
362        let (outcome_sender, outcome_receiver) = oneshot::channel();
363        let mut outcome_receiver = outcome_receiver.fuse();
364
365        runtime_request_sender
366            .send(ServiceRuntimeRequest::Query {
367                application_id,
368                context,
369                query,
370                callback: outcome_sender,
371            })
372            .expect("Service runtime thread should only stop when `request_sender` is dropped");
373
374        let mut txn_tracker = TransactionTracker::default();
375        let mut resource_controller = ResourceController::default();
376        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
377
378        loop {
379            futures::select! {
380                maybe_request = incoming_execution_requests.next() => {
381                    if let Some(request) = maybe_request {
382                        actor.handle_request(request).await?;
383                    }
384                }
385                outcome = &mut outcome_receiver => {
386                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
387                }
388            }
389        }
390    }
391
392    pub async fn list_applications(
393        &self,
394    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
395        let mut applications = vec![];
396        for app_id in self.users.indices().await? {
397            let blob_id = app_id.description_blob_id();
398            let blob_content = self.system.read_blob_content(blob_id).await?;
399            let application_description = bcs::from_bytes(blob_content.bytes())?;
400            applications.push((app_id, application_description));
401        }
402        Ok(applications)
403    }
404}