Skip to main content

linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    ops::{Deref, DerefMut},
7    vec,
8};
9
10use allocative::Allocative;
11use futures::{FutureExt, StreamExt};
12use linera_base::{
13    crypto::{BcsHashable, CryptoHash},
14    data_types::{Blob, BlobContent, BlockHeight, OracleResponse, StreamUpdate},
15    ensure,
16    identifiers::{AccountOwner, BlobId, BlobType, ChainId, GenericApplicationId, StreamId},
17    time::Instant,
18};
19use linera_views::{
20    context::Context,
21    historical_hash_wrapper::HistoricallyHashableView,
22    key_value_store_view::KeyValueStoreView,
23    map_view::MapView,
24    reentrant_collection_view::ReentrantCollectionView,
25    views::{ClonableView, ReplaceContext, View},
26    ViewError,
27};
28use serde::{Deserialize, Serialize};
29#[cfg(with_testing)]
30use {
31    crate::{
32        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
33    },
34    linera_views::context::MemoryContext,
35    std::sync::Arc,
36};
37
38use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
39use crate::{
40    execution_state_actor::ExecutionStateActor,
41    resources::ResourceController,
42    system::{SystemExecutionStateView, SystemMessage},
43    transaction_tracker::PreparedCheckpoint,
44    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext, JsVec, Message,
45    MessageContext, OperationContext, OutgoingMessage, ProcessStreamsContext, Query, QueryContext,
46    QueryOutcome, ServiceSyncRuntime, Timestamp, TransactionTracker,
47};
48
49/// An inner view accessing the execution state of a chain, for hashing purposes.
50#[derive(Debug, ClonableView, View, Allocative)]
51#[allocative(bound = "C")]
52pub struct ExecutionStateViewInner<C> {
53    /// System application.
54    pub system: SystemExecutionStateView<C>,
55    /// User applications.
56    pub users: ReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
57    /// The heights of previous blocks that sent messages to the same recipients.
58    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
59    /// The heights of previous blocks that published events to the same streams.
60    pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
61}
62
63impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateViewInner<C> {
64    type Target = ExecutionStateViewInner<C2>;
65
66    async fn with_context(
67        &mut self,
68        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
69    ) -> Self::Target {
70        ExecutionStateViewInner {
71            system: self.system.with_context(ctx.clone()).await,
72            users: self.users.with_context(ctx.clone()).await,
73            previous_message_blocks: self.previous_message_blocks.with_context(ctx.clone()).await,
74            previous_event_blocks: self.previous_event_blocks.with_context(ctx.clone()).await,
75        }
76    }
77}
78
79/// A view accessing the execution state of a chain.
80#[derive(Debug, ClonableView, View, Allocative)]
81#[allocative(bound = "C")]
82pub struct ExecutionStateView<C> {
83    inner: HistoricallyHashableView<C, ExecutionStateViewInner<C>>,
84}
85
86impl<C> Deref for ExecutionStateView<C> {
87    type Target = ExecutionStateViewInner<C>;
88
89    fn deref(&self) -> &ExecutionStateViewInner<C> {
90        self.inner.deref()
91    }
92}
93
94impl<C> DerefMut for ExecutionStateView<C> {
95    fn deref_mut(&mut self) -> &mut ExecutionStateViewInner<C> {
96        self.inner.deref_mut()
97    }
98}
99
100impl<C> ExecutionStateView<C>
101where
102    C: Context + Clone + 'static,
103    C::Extra: ExecutionRuntimeContext,
104{
105    /// Computes the cryptographic hash of the execution state.
106    pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
107        #[derive(Serialize, Deserialize)]
108        struct ExecutionStateViewHash([u8; 32]);
109        impl BcsHashable<'_> for ExecutionStateViewHash {}
110        let hash = self.inner.historical_hash().await?;
111        Ok(CryptoHash::new(&ExecutionStateViewHash(hash.into())))
112    }
113
114    /// Validates the execution-state-level preconditions for a `SystemOperation::Checkpoint`
115    /// and dumps the inner view's persisted content as one or more [`Blob`]s. The dump is
116    /// split into chunks of at most `maximum_blob_size` bytes (from the current epoch's
117    /// resource policy) so each chunk respects the per-blob size limit even for very
118    /// large states. The blobs are not yet published; the caller is expected to register
119    /// them during transaction execution via [`Self::apply_checkpoint`].
120    ///
121    /// This is a *pre-block* operation: it must run before the block-level setup mutates
122    /// the chain state (e.g. setting `system.timestamp`), because `dump_content` reads
123    /// from storage and refuses to run with pending in-memory changes. Splitting the
124    /// dump out of the operation handler also guarantees the captured bytes represent
125    /// the chain's pre-block state, which is exactly what a bootstrapping node will
126    /// `restore_from_content` from before re-applying the certified checkpoint block.
127    pub async fn prepare_checkpoint(
128        &mut self,
129        maximum_blob_size: u64,
130    ) -> Result<Vec<Blob>, ExecutionError> {
131        // User event streams are summarized and pruned by the checkpoint itself (see
132        // `ExecutionStateActor`'s checkpoint handler), so they do not block checkpointing.
133        // System event streams (e.g. the epoch streams on the admin chain) are not
134        // summarized, so a chain that published any is still not allowed to checkpoint.
135        let mut had_system_event_block = false;
136        self.previous_event_blocks
137            .for_each_index_while(|stream_id| {
138                if matches!(stream_id.application_id, GenericApplicationId::System) {
139                    had_system_event_block = true;
140                    Ok(false)
141                } else {
142                    Ok(true)
143                }
144            })
145            .await?;
146        ensure!(
147            !had_system_event_block,
148            ExecutionError::CheckpointPreconditionFailed("chain has published system events")
149        );
150
151        let (bytes, _content_hash) = self.inner.dump_content().await?;
152        let chunk_size = usize::try_from(maximum_blob_size).unwrap_or(usize::MAX);
153        Ok(bytes
154            .chunks(chunk_size)
155            .map(|chunk| {
156                Blob::new(BlobContent::new(
157                    BlobType::CheckpointExecutionState,
158                    chunk.to_vec(),
159                ))
160            })
161            .collect())
162    }
163
164    /// Registers the pre-block-computed checkpoint inputs (from
165    /// [`Self::prepare_checkpoint`]) with the transaction tracker. This: publishes the
166    /// execution-state blobs, records the matching [`OracleResponse::Checkpoint`] (which
167    /// also lists every blob the chain currently references in `used_blobs` so a
168    /// bootstrapping node can fetch them from shared storage), and emits a
169    /// [`SystemMessage::CheckpointAck`] to each origin chain so the origin can later trim
170    /// its outbox dump of already-delivered messages.
171    pub async fn apply_checkpoint(
172        &mut self,
173        prepared: PreparedCheckpoint,
174        txn_tracker: &mut TransactionTracker,
175    ) -> Result<(), ExecutionError> {
176        let PreparedCheckpoint {
177            blobs,
178            origin_cursors,
179            inbox_cursors,
180            outbox_block_hashes,
181        } = prepared;
182        let execution_state_blobs = blobs.iter().map(|blob| blob.id().hash).collect();
183        let used_blobs = self.system.used_blobs.indices().await?;
184        for blob in blobs {
185            txn_tracker.add_created_blob(blob);
186        }
187        txn_tracker.replay_oracle_response(OracleResponse::Checkpoint {
188            execution_state_blobs,
189            used_blobs,
190            outbox_block_hashes,
191            inbox_cursors,
192        })?;
193        for (origin, latest_received_cursor) in origin_cursors {
194            txn_tracker.add_outgoing_message(OutgoingMessage::new(
195                origin,
196                Message::System(SystemMessage::CheckpointAck {
197                    latest_received_cursor,
198                }),
199            ));
200        }
201        // We just emitted notifications for everyone in `pending_checkpoint_ack_targets`;
202        // reset the set so the next own checkpoint only fires for origins that send
203        // us a fresh non-`Checkpoint` message in the meantime.
204        self.system.pending_checkpoint_ack_targets.clear();
205        Ok(())
206    }
207
208    /// Replaces the persisted execution state with the content of a checkpoint blob,
209    /// recording the hash of the bytes as the new stored hash. The caller is
210    /// contractually obliged to reload the view after this returns.
211    pub async fn restore_from_content(&mut self, bytes: &[u8]) -> Result<(), ViewError> {
212        self.inner.restore_from_content(bytes).await?;
213        Ok(())
214    }
215}
216
217impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
218    type Target = ExecutionStateView<C2>;
219
220    async fn with_context(
221        &mut self,
222        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
223    ) -> Self::Target {
224        ExecutionStateView {
225            inner: self.inner.with_context(ctx.clone()).await,
226        }
227    }
228}
229
230/// How to interact with a long-lived service runtime.
231pub struct ServiceRuntimeEndpoint {
232    /// How to receive requests.
233    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
234    /// How to query the runtime.
235    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
236}
237
238#[cfg(with_testing)]
239impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
240where
241    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + 'static,
242{
243    /// Simulates the instantiation of an application.
244    pub async fn simulate_instantiation(
245        &mut self,
246        contract: UserContractCode,
247        local_time: linera_base::data_types::Timestamp,
248        application_description: ApplicationDescription,
249        instantiation_argument: Vec<u8>,
250        contract_blob: Blob,
251        service_blob: Blob,
252    ) -> Result<(), ExecutionError> {
253        let chain_id = application_description.creator_chain_id;
254        assert_eq!(chain_id, self.context().extra().chain_id);
255        let context = OperationContext {
256            chain_id,
257            authenticated_owner: None,
258            height: application_description.block_height,
259            round: None,
260            timestamp: local_time,
261        };
262
263        let action = UserAction::Instantiate(context, instantiation_argument);
264        let next_application_index = application_description.application_index + 1;
265        let next_chain_index = 0;
266
267        let application_id = From::from(&application_description);
268        let blob = Blob::new_application_description(&application_description);
269
270        self.system.used_blobs.insert(&blob.id())?;
271        self.system.used_blobs.insert(&contract_blob.id())?;
272        self.system.used_blobs.insert(&service_blob.id())?;
273
274        self.context()
275            .extra()
276            .user_contracts()
277            .pin()
278            .insert(application_id, contract);
279
280        self.context()
281            .extra()
282            .add_blobs([
283                contract_blob,
284                service_blob,
285                Blob::new_application_description(&application_description),
286            ])
287            .await?;
288
289        let tracker = ResourceTracker::default();
290        let policy = ResourceControlPolicy::no_fees();
291        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
292        let mut txn_tracker = TransactionTracker::new(
293            local_time,
294            0,
295            next_application_index,
296            next_chain_index,
297            None,
298            &[],
299        );
300        txn_tracker.add_created_blob(blob);
301        Box::pin(
302            ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
303                .run_user_action(application_id, action, context.refund_grant_to(), None),
304        )
305        .await?;
306
307        Ok(())
308    }
309}
310
311pub enum UserAction {
312    Instantiate(OperationContext, Vec<u8>),
313    Operation(OperationContext, Vec<u8>),
314    Message(MessageContext, Vec<u8>),
315    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
316    SummarizeEvents(ProcessStreamsContext, Vec<StreamUpdate>),
317}
318
319impl UserAction {
320    pub(crate) fn signer(&self) -> Option<AccountOwner> {
321        match self {
322            UserAction::Instantiate(context, _) => context.authenticated_owner,
323            UserAction::Operation(context, _) => context.authenticated_owner,
324            UserAction::ProcessStreams(_, _) => None,
325            UserAction::SummarizeEvents(_, _) => None,
326            UserAction::Message(context, _) => context.authenticated_owner,
327        }
328    }
329
330    pub(crate) fn height(&self) -> BlockHeight {
331        match self {
332            UserAction::Instantiate(context, _) => context.height,
333            UserAction::Operation(context, _) => context.height,
334            UserAction::ProcessStreams(context, _) => context.height,
335            UserAction::SummarizeEvents(context, _) => context.height,
336            UserAction::Message(context, _) => context.height,
337        }
338    }
339
340    pub(crate) fn round(&self) -> Option<u32> {
341        match self {
342            UserAction::Instantiate(context, _) => context.round,
343            UserAction::Operation(context, _) => context.round,
344            UserAction::ProcessStreams(context, _) => context.round,
345            UserAction::SummarizeEvents(context, _) => context.round,
346            UserAction::Message(context, _) => context.round,
347        }
348    }
349
350    pub(crate) fn timestamp(&self) -> Timestamp {
351        match self {
352            UserAction::Instantiate(context, _) => context.timestamp,
353            UserAction::Operation(context, _) => context.timestamp,
354            UserAction::ProcessStreams(context, _) => context.timestamp,
355            UserAction::SummarizeEvents(context, _) => context.timestamp,
356            UserAction::Message(context, _) => context.timestamp,
357        }
358    }
359}
360
361impl<C> ExecutionStateView<C>
362where
363    C: Context + Clone + 'static,
364    C::Extra: ExecutionRuntimeContext,
365{
366    /// Runs a query against the given application and returns its response.
367    pub async fn query_application(
368        &mut self,
369        context: QueryContext,
370        query: Query,
371        endpoint: Option<&mut ServiceRuntimeEndpoint>,
372    ) -> Result<QueryOutcome, ExecutionError> {
373        assert_eq!(context.chain_id, self.context().extra().chain_id());
374        match query {
375            Query::System(query) => {
376                let outcome = self.system.handle_query(context, query);
377                Ok(outcome.into())
378            }
379            Query::User {
380                application_id,
381                bytes,
382            } => {
383                let outcome = match endpoint {
384                    Some(endpoint) => {
385                        self.query_user_application_with_long_lived_service(
386                            application_id,
387                            context,
388                            bytes,
389                            &mut endpoint.incoming_execution_requests,
390                            &endpoint.runtime_request_sender,
391                        )
392                        .await?
393                    }
394                    None => {
395                        self.query_user_application(application_id, context, bytes)
396                            .await?
397                    }
398                };
399                Ok(outcome.into())
400            }
401        }
402    }
403
404    async fn query_user_application(
405        &mut self,
406        application_id: ApplicationId,
407        context: QueryContext,
408        query: Vec<u8>,
409    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
410        self.query_user_application_with_deadline(
411            application_id,
412            context,
413            query,
414            None,
415            BTreeMap::new(),
416        )
417        .await
418    }
419
420    pub(crate) async fn query_user_application_with_deadline(
421        &mut self,
422        application_id: ApplicationId,
423        context: QueryContext,
424        query: Vec<u8>,
425        deadline: Option<Instant>,
426        created_blobs: BTreeMap<BlobId, BlobContent>,
427    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
428        let (execution_state_sender, mut execution_state_receiver) =
429            futures::channel::mpsc::unbounded();
430        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
431        let mut resource_controller = ResourceController::default();
432        let thread_pool = self.context().extra().thread_pool().clone();
433        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
434
435        let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
436
437        let service_runtime_task = thread_pool
438            .run_send(JsVec(codes), move |codes| async move {
439                let mut runtime = ServiceSyncRuntime::new_with_deadline(
440                    execution_state_sender,
441                    context,
442                    deadline,
443                );
444
445                for (code, description) in codes.0.into_iter().zip(descriptions) {
446                    runtime.preload_service(ApplicationId::from(&description), code, description);
447                }
448
449                runtime.run_query(application_id, query)
450            })
451            .await;
452
453        while let Some(request) = execution_state_receiver.next().await {
454            actor.handle_request(request).await?;
455        }
456
457        service_runtime_task.await?
458    }
459
460    async fn query_user_application_with_long_lived_service(
461        &mut self,
462        application_id: ApplicationId,
463        context: QueryContext,
464        query: Vec<u8>,
465        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
466            ExecutionRequest,
467        >,
468        runtime_request_sender: &std::sync::mpsc::Sender<ServiceRuntimeRequest>,
469    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
470        let (outcome_sender, outcome_receiver) = oneshot::channel();
471        let mut outcome_receiver = outcome_receiver.fuse();
472
473        runtime_request_sender
474            .send(ServiceRuntimeRequest::Query {
475                application_id,
476                context,
477                query,
478                callback: outcome_sender,
479            })
480            .expect("Service runtime thread should only stop when `request_sender` is dropped");
481
482        let mut txn_tracker = TransactionTracker::default();
483        let mut resource_controller = ResourceController::default();
484        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
485
486        loop {
487            futures::select! {
488                maybe_request = incoming_execution_requests.next() => {
489                    if let Some(request) = maybe_request {
490                        actor.handle_request(request).await?;
491                    }
492                }
493                outcome = &mut outcome_receiver => {
494                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
495                }
496            }
497        }
498    }
499
500    /// Returns the descriptions of all applications registered on this chain.
501    pub async fn list_applications(
502        &self,
503    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
504        let mut applications = vec![];
505        for app_id in self.users.indices().await? {
506            let blob_id = app_id.description_blob_id();
507            let blob_content = self.system.read_blob_content(blob_id).await?;
508            let application_description = bcs::from_bytes(blob_content.bytes())?;
509            applications.push((app_id, application_description));
510        }
511        Ok(applications)
512    }
513}