linera_core/chain_worker/
actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! An actor that runs a chain worker.
5
6use std::{
7    collections::{BTreeMap, HashSet},
8    fmt,
9    sync::{Arc, RwLock},
10};
11
12use custom_debug_derive::Debug;
13use linera_base::{
14    crypto::{CryptoHash, ValidatorPublicKey},
15    data_types::{ApplicationDescription, Blob, BlockHeight, Epoch, Timestamp},
16    hashed::Hashed,
17    identifiers::{ApplicationId, BlobId, ChainId},
18};
19use linera_chain::{
20    data_types::{BlockProposal, MessageBundle, ProposedBlock},
21    types::{Block, ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate},
22    ChainStateView,
23};
24use linera_execution::{
25    ExecutionStateView, Query, QueryContext, QueryOutcome, ServiceRuntimeEndpoint,
26    ServiceSyncRuntime,
27};
28use linera_storage::Storage;
29use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
30use tracing::{debug, instrument, trace, warn, Instrument as _};
31
32use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier};
33use crate::{
34    data_types::{ChainInfoQuery, ChainInfoResponse},
35    value_cache::ValueCache,
36    worker::{NetworkActions, WorkerError},
37};
38
39/// A request for the [`ChainWorkerActor`].
40#[derive(Debug)]
41pub enum ChainWorkerRequest<Context>
42where
43    Context: linera_views::context::Context + Clone + Send + Sync + 'static,
44{
45    /// Reads the certificate for a requested [`BlockHeight`].
46    #[cfg(with_testing)]
47    ReadCertificate {
48        height: BlockHeight,
49        #[debug(skip)]
50        callback: oneshot::Sender<Result<Option<ConfirmedBlockCertificate>, WorkerError>>,
51    },
52
53    /// Search for a bundle in one of the chain's inboxes.
54    #[cfg(with_testing)]
55    FindBundleInInbox {
56        inbox_id: ChainId,
57        certificate_hash: CryptoHash,
58        height: BlockHeight,
59        index: u32,
60        #[debug(skip)]
61        callback: oneshot::Sender<Result<Option<MessageBundle>, WorkerError>>,
62    },
63
64    /// Request a read-only view of the [`ChainStateView`].
65    GetChainStateView {
66        #[debug(skip)]
67        callback:
68            oneshot::Sender<Result<OwnedRwLockReadGuard<ChainStateView<Context>>, WorkerError>>,
69    },
70
71    /// Query an application's state.
72    QueryApplication {
73        query: Query,
74        #[debug(skip)]
75        callback: oneshot::Sender<Result<QueryOutcome, WorkerError>>,
76    },
77
78    /// Describe an application.
79    DescribeApplication {
80        application_id: ApplicationId,
81        #[debug(skip)]
82        callback: oneshot::Sender<Result<ApplicationDescription, WorkerError>>,
83    },
84
85    /// Execute a block but discard any changes to the chain state.
86    StageBlockExecution {
87        block: ProposedBlock,
88        round: Option<u32>,
89        published_blobs: Vec<Blob>,
90        #[debug(skip)]
91        callback: oneshot::Sender<Result<(Block, ChainInfoResponse), WorkerError>>,
92    },
93
94    /// Process a leader timeout issued for this multi-owner chain.
95    ProcessTimeout {
96        certificate: TimeoutCertificate,
97        #[debug(skip)]
98        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
99    },
100
101    /// Handle a proposal for the next block on this chain.
102    HandleBlockProposal {
103        proposal: BlockProposal,
104        #[debug(skip)]
105        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
106    },
107
108    /// Process a validated block issued for this multi-owner chain.
109    ProcessValidatedBlock {
110        certificate: ValidatedBlockCertificate,
111        #[debug(skip)]
112        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions, bool), WorkerError>>,
113    },
114
115    /// Process a confirmed block (a commit).
116    ProcessConfirmedBlock {
117        certificate: ConfirmedBlockCertificate,
118        #[debug(with = "elide_option")]
119        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
120        #[debug(skip)]
121        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
122    },
123
124    /// Preprocess a block without executing it.
125    PreprocessCertificate {
126        certificate: ConfirmedBlockCertificate,
127        #[debug(skip)]
128        callback: oneshot::Sender<Result<NetworkActions, WorkerError>>,
129    },
130
131    /// Process a cross-chain update.
132    ProcessCrossChainUpdate {
133        origin: ChainId,
134        bundles: Vec<(Epoch, MessageBundle)>,
135        #[debug(skip)]
136        callback: oneshot::Sender<Result<Option<BlockHeight>, WorkerError>>,
137    },
138
139    /// Handle cross-chain request to confirm that the recipient was updated.
140    ConfirmUpdatedRecipient {
141        recipient: ChainId,
142        latest_height: BlockHeight,
143        #[debug(skip)]
144        callback: oneshot::Sender<Result<(), WorkerError>>,
145    },
146
147    /// Handle a [`ChainInfoQuery`].
148    HandleChainInfoQuery {
149        query: ChainInfoQuery,
150        #[debug(skip)]
151        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
152    },
153
154    /// Get a blob if it belongs to the current locking block or pending proposal.
155    DownloadPendingBlob {
156        blob_id: BlobId,
157        #[debug(skip)]
158        callback: oneshot::Sender<Result<Blob, WorkerError>>,
159    },
160
161    /// Handle a blob that belongs to a pending proposal or validated block certificate.
162    HandlePendingBlob {
163        blob: Blob,
164        #[debug(skip)]
165        callback: oneshot::Sender<Result<ChainInfoResponse, WorkerError>>,
166    },
167
168    /// Update the received certificate trackers to at least the given values.
169    UpdateReceivedCertificateTrackers {
170        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
171        callback: oneshot::Sender<Result<(), WorkerError>>,
172    },
173}
174
175/// The actor worker type.
176pub struct ChainWorkerActor<StorageClient>
177where
178    StorageClient: Storage + Clone + Send + Sync + 'static,
179{
180    worker: ChainWorkerState<StorageClient>,
181    service_runtime_thread: Option<linera_base::task::Blocking>,
182}
183
184impl<StorageClient> ChainWorkerActor<StorageClient>
185where
186    StorageClient: Storage + Clone + Send + Sync + 'static,
187{
188    /// Runs the [`ChainWorkerActor`], first by loading the chain state from `storage` then
189    /// handling all `incoming_requests` as they arrive.
190    ///
191    /// If loading the chain state fails, the next request will receive the error reported by the
192    /// `storage`, and the actor will then try again to load the state.
193    #[expect(clippy::too_many_arguments)]
194    pub async fn run(
195        config: ChainWorkerConfig,
196        storage: StorageClient,
197        block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
198        execution_state_cache: Arc<
199            ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
200        >,
201        tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
202        delivery_notifier: DeliveryNotifier,
203        chain_id: ChainId,
204        mut incoming_requests: mpsc::UnboundedReceiver<(
205            ChainWorkerRequest<StorageClient::Context>,
206            tracing::Span,
207        )>,
208    ) {
209        let actor = loop {
210            let load_result = Self::load(
211                config.clone(),
212                storage.clone(),
213                block_cache.clone(),
214                execution_state_cache.clone(),
215                tracked_chains.clone(),
216                delivery_notifier.clone(),
217                chain_id,
218            )
219            .await
220            .inspect_err(|error| warn!("Failed to load chain state: {error:?}"));
221
222            match load_result {
223                Ok(actor) => break actor,
224                Err(error) => match incoming_requests.recv().await {
225                    Some((request, _span)) => request.send_error(error),
226                    None => return,
227                },
228            }
229        };
230
231        actor.handle_requests(incoming_requests).await;
232    }
233
234    /// Creates a [`ChainWorkerActor`], loading it with the chain state for the requested
235    /// [`ChainId`].
236    pub async fn load(
237        config: ChainWorkerConfig,
238        storage: StorageClient,
239        block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
240        execution_state_cache: Arc<
241            ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
242        >,
243        tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
244        delivery_notifier: DeliveryNotifier,
245        chain_id: ChainId,
246    ) -> Result<Self, WorkerError> {
247        let (service_runtime_thread, service_runtime_endpoint) = {
248            if config.long_lived_services {
249                let (thread, endpoint) = Self::spawn_service_runtime_actor(chain_id).await;
250                (Some(thread), Some(endpoint))
251            } else {
252                (None, None)
253            }
254        };
255
256        let worker = ChainWorkerState::load(
257            config,
258            storage,
259            block_cache,
260            execution_state_cache,
261            tracked_chains,
262            delivery_notifier,
263            chain_id,
264            service_runtime_endpoint,
265        )
266        .await?;
267
268        Ok(ChainWorkerActor {
269            worker,
270            service_runtime_thread,
271        })
272    }
273
274    /// Spawns a blocking task to execute the service runtime actor.
275    ///
276    /// Returns the task handle and the endpoints to interact with the actor.
277    async fn spawn_service_runtime_actor(
278        chain_id: ChainId,
279    ) -> (linera_base::task::Blocking, ServiceRuntimeEndpoint) {
280        let context = QueryContext {
281            chain_id,
282            next_block_height: BlockHeight(0),
283            local_time: Timestamp::from(0),
284        };
285
286        let (execution_state_sender, incoming_execution_requests) =
287            futures::channel::mpsc::unbounded();
288        let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel();
289
290        let service_runtime_thread = linera_base::task::Blocking::spawn(move |_| async move {
291            ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver)
292        })
293        .await;
294
295        let endpoint = ServiceRuntimeEndpoint {
296            incoming_execution_requests,
297            runtime_request_sender,
298        };
299        (service_runtime_thread, endpoint)
300    }
301
302    /// Runs the worker until there are no more incoming requests.
303    #[instrument(
304        name = "ChainWorkerActor::handle_requests",
305        skip_all,
306        fields(chain_id = format!("{:.8}", self.worker.chain_id())),
307    )]
308    pub async fn handle_requests(
309        mut self,
310        mut incoming_requests: mpsc::UnboundedReceiver<(
311            ChainWorkerRequest<StorageClient::Context>,
312            tracing::Span,
313        )>,
314    ) {
315        trace!("Starting `ChainWorkerActor`");
316
317        while let Some((request, span)) = incoming_requests.recv().await {
318            Box::pin(self.handle_request(request).instrument(span)).await;
319        }
320
321        if let Some(thread) = self.service_runtime_thread {
322            drop(self.worker);
323            thread.join().await
324        }
325
326        trace!("`ChainWorkerActor` finished");
327    }
328
329    /// Runs the worker until there are no more incoming requests.
330    #[instrument(skip(self, request))]
331    pub async fn handle_request(&mut self, request: ChainWorkerRequest<StorageClient::Context>) {
332        // TODO(#2237): Spawn concurrent tasks for read-only operations
333        let responded = match request {
334            #[cfg(with_testing)]
335            ChainWorkerRequest::ReadCertificate { height, callback } => callback
336                .send(self.worker.read_certificate(height).await)
337                .is_ok(),
338            #[cfg(with_testing)]
339            ChainWorkerRequest::FindBundleInInbox {
340                inbox_id,
341                certificate_hash,
342                height,
343                index,
344                callback,
345            } => callback
346                .send(
347                    self.worker
348                        .find_bundle_in_inbox(inbox_id, certificate_hash, height, index)
349                        .await,
350                )
351                .is_ok(),
352            ChainWorkerRequest::GetChainStateView { callback } => {
353                callback.send(self.worker.chain_state_view().await).is_ok()
354            }
355            ChainWorkerRequest::QueryApplication { query, callback } => callback
356                .send(self.worker.query_application(query).await)
357                .is_ok(),
358            ChainWorkerRequest::DescribeApplication {
359                application_id,
360                callback,
361            } => callback
362                .send(self.worker.describe_application(application_id).await)
363                .is_ok(),
364            ChainWorkerRequest::StageBlockExecution {
365                block,
366                round,
367                published_blobs,
368                callback,
369            } => callback
370                .send(
371                    self.worker
372                        .stage_block_execution(block, round, &published_blobs)
373                        .await,
374                )
375                .is_ok(),
376            ChainWorkerRequest::ProcessTimeout {
377                certificate,
378                callback,
379            } => callback
380                .send(self.worker.process_timeout(certificate).await)
381                .is_ok(),
382            ChainWorkerRequest::HandleBlockProposal { proposal, callback } => callback
383                .send(self.worker.handle_block_proposal(proposal).await)
384                .is_ok(),
385            ChainWorkerRequest::ProcessValidatedBlock {
386                certificate,
387                callback,
388            } => callback
389                .send(self.worker.process_validated_block(certificate).await)
390                .is_ok(),
391            ChainWorkerRequest::ProcessConfirmedBlock {
392                certificate,
393                notify_when_messages_are_delivered,
394                callback,
395            } => callback
396                .send(
397                    self.worker
398                        .process_confirmed_block(certificate, notify_when_messages_are_delivered)
399                        .await,
400                )
401                .is_ok(),
402            ChainWorkerRequest::PreprocessCertificate {
403                certificate,
404                callback,
405            } => callback
406                .send(self.worker.preprocess_certificate(certificate).await)
407                .is_ok(),
408            ChainWorkerRequest::ProcessCrossChainUpdate {
409                origin,
410                bundles,
411                callback,
412            } => callback
413                .send(
414                    self.worker
415                        .process_cross_chain_update(origin, bundles)
416                        .await,
417                )
418                .is_ok(),
419            ChainWorkerRequest::ConfirmUpdatedRecipient {
420                recipient,
421                latest_height,
422                callback,
423            } => callback
424                .send(
425                    self.worker
426                        .confirm_updated_recipient(recipient, latest_height)
427                        .await,
428                )
429                .is_ok(),
430            ChainWorkerRequest::HandleChainInfoQuery { query, callback } => callback
431                .send(self.worker.handle_chain_info_query(query).await)
432                .is_ok(),
433            ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } => callback
434                .send(self.worker.download_pending_blob(blob_id).await)
435                .is_ok(),
436            ChainWorkerRequest::HandlePendingBlob { blob, callback } => callback
437                .send(self.worker.handle_pending_blob(blob).await)
438                .is_ok(),
439            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
440                new_trackers,
441                callback,
442            } => callback
443                .send(
444                    self.worker
445                        .update_received_certificate_trackers(new_trackers)
446                        .await,
447                )
448                .is_ok(),
449        };
450
451        if !responded {
452            warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
453        }
454    }
455}
456
457impl<Context> ChainWorkerRequest<Context>
458where
459    Context: linera_views::context::Context + Clone + Send + Sync + 'static,
460{
461    /// Responds to this request with an `error`.
462    pub fn send_error(self, error: WorkerError) {
463        debug!("Immediately sending error to chain worker request {self:?}");
464
465        let responded = match self {
466            #[cfg(with_testing)]
467            ChainWorkerRequest::ReadCertificate { callback, .. } => {
468                callback.send(Err(error)).is_ok()
469            }
470            #[cfg(with_testing)]
471            ChainWorkerRequest::FindBundleInInbox { callback, .. } => {
472                callback.send(Err(error)).is_ok()
473            }
474            ChainWorkerRequest::GetChainStateView { callback } => callback.send(Err(error)).is_ok(),
475            ChainWorkerRequest::QueryApplication { callback, .. } => {
476                callback.send(Err(error)).is_ok()
477            }
478            ChainWorkerRequest::DescribeApplication { callback, .. } => {
479                callback.send(Err(error)).is_ok()
480            }
481            ChainWorkerRequest::StageBlockExecution { callback, .. } => {
482                callback.send(Err(error)).is_ok()
483            }
484            ChainWorkerRequest::ProcessTimeout { callback, .. } => {
485                callback.send(Err(error)).is_ok()
486            }
487            ChainWorkerRequest::HandleBlockProposal { callback, .. } => {
488                callback.send(Err(error)).is_ok()
489            }
490            ChainWorkerRequest::ProcessValidatedBlock { callback, .. } => {
491                callback.send(Err(error)).is_ok()
492            }
493            ChainWorkerRequest::ProcessConfirmedBlock { callback, .. } => {
494                callback.send(Err(error)).is_ok()
495            }
496            ChainWorkerRequest::PreprocessCertificate { callback, .. } => {
497                callback.send(Err(error)).is_ok()
498            }
499            ChainWorkerRequest::ProcessCrossChainUpdate { callback, .. } => {
500                callback.send(Err(error)).is_ok()
501            }
502            ChainWorkerRequest::ConfirmUpdatedRecipient { callback, .. } => {
503                callback.send(Err(error)).is_ok()
504            }
505            ChainWorkerRequest::HandleChainInfoQuery { callback, .. } => {
506                callback.send(Err(error)).is_ok()
507            }
508            ChainWorkerRequest::DownloadPendingBlob { callback, .. } => {
509                callback.send(Err(error)).is_ok()
510            }
511            ChainWorkerRequest::HandlePendingBlob { callback, .. } => {
512                callback.send(Err(error)).is_ok()
513            }
514            ChainWorkerRequest::UpdateReceivedCertificateTrackers { callback, .. } => {
515                callback.send(Err(error)).is_ok()
516            }
517        };
518
519        if !responded {
520            warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
521        }
522    }
523}
524
525/// Writes an option as `Some(..)` or `None`.
526fn elide_option<T>(option: &Option<T>, f: &mut fmt::Formatter) -> fmt::Result {
527    match option {
528        Some(_) => write!(f, "Some(..)"),
529        None => write!(f, "None"),
530    }
531}