linera_core/chain_worker/
actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! An actor that runs a chain worker.
5
6use std::{
7    collections::{BTreeMap, HashSet},
8    fmt,
9    sync::{self, Arc, RwLock},
10};
11
12use custom_debug_derive::Debug;
13use futures::FutureExt;
14use linera_base::{
15    crypto::{CryptoHash, ValidatorPublicKey},
16    data_types::{ApplicationDescription, Blob, BlockHeight, Epoch, TimeDelta, Timestamp},
17    hashed::Hashed,
18    identifiers::{ApplicationId, BlobId, ChainId},
19};
20use linera_chain::{
21    data_types::{BlockProposal, MessageBundle, ProposedBlock},
22    types::{Block, ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate},
23    ChainStateView,
24};
25use linera_execution::{
26    ExecutionStateView, Query, QueryContext, QueryOutcome, ServiceRuntimeEndpoint,
27    ServiceSyncRuntime,
28};
29use linera_storage::{Clock as _, Storage};
30use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
31use tracing::{debug, instrument, trace, warn, Instrument as _};
32
33use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier};
34use crate::{
35    data_types::{ChainInfoQuery, ChainInfoResponse},
36    value_cache::ValueCache,
37    worker::{NetworkActions, WorkerError},
38};
39
40/// A request for the [`ChainWorkerActor`].
41#[derive(Debug)]
42pub enum ChainWorkerRequest<Context>
43where
44    Context: linera_views::context::Context + Clone + Send + Sync + 'static,
45{
46    /// Reads the certificate for a requested [`BlockHeight`].
47    #[cfg(with_testing)]
48    ReadCertificate {
49        height: BlockHeight,
50        #[debug(skip)]
51        callback: oneshot::Sender<Result<Option<ConfirmedBlockCertificate>, WorkerError>>,
52    },
53
54    /// Search for a bundle in one of the chain's inboxes.
55    #[cfg(with_testing)]
56    FindBundleInInbox {
57        inbox_id: ChainId,
58        certificate_hash: CryptoHash,
59        height: BlockHeight,
60        index: u32,
61        #[debug(skip)]
62        callback: oneshot::Sender<Result<Option<MessageBundle>, WorkerError>>,
63    },
64
65    /// Request a read-only view of the [`ChainStateView`].
66    GetChainStateView {
67        #[debug(skip)]
68        callback:
69            oneshot::Sender<Result<OwnedRwLockReadGuard<ChainStateView<Context>>, WorkerError>>,
70    },
71
72    /// Query an application's state.
73    QueryApplication {
74        query: Query,
75        #[debug(skip)]
76        callback: oneshot::Sender<Result<QueryOutcome, WorkerError>>,
77    },
78
79    /// Describe an application.
80    DescribeApplication {
81        application_id: ApplicationId,
82        #[debug(skip)]
83        callback: oneshot::Sender<Result<ApplicationDescription, WorkerError>>,
84    },
85
86    /// Execute a block but discard any changes to the chain state.
87    StageBlockExecution {
88        block: ProposedBlock,
89        round: Option<u32>,
90        published_blobs: Vec<Blob>,
91        #[debug(skip)]
92        callback: oneshot::Sender<Result<(Block, ChainInfoResponse), WorkerError>>,
93    },
94
95    /// Process a leader timeout issued for this multi-owner chain.
96    ProcessTimeout {
97        certificate: TimeoutCertificate,
98        #[debug(skip)]
99        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
100    },
101
102    /// Handle a proposal for the next block on this chain.
103    HandleBlockProposal {
104        proposal: BlockProposal,
105        #[debug(skip)]
106        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
107    },
108
109    /// Process a validated block issued for this multi-owner chain.
110    ProcessValidatedBlock {
111        certificate: ValidatedBlockCertificate,
112        #[debug(skip)]
113        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions, bool), WorkerError>>,
114    },
115
116    /// Process a confirmed block (a commit).
117    ProcessConfirmedBlock {
118        certificate: ConfirmedBlockCertificate,
119        #[debug(with = "elide_option")]
120        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
121        #[debug(skip)]
122        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
123    },
124
125    /// Process a cross-chain update.
126    ProcessCrossChainUpdate {
127        origin: ChainId,
128        bundles: Vec<(Epoch, MessageBundle)>,
129        #[debug(skip)]
130        callback: oneshot::Sender<Result<Option<BlockHeight>, WorkerError>>,
131    },
132
133    /// Handle cross-chain request to confirm that the recipient was updated.
134    ConfirmUpdatedRecipient {
135        recipient: ChainId,
136        latest_height: BlockHeight,
137        #[debug(skip)]
138        callback: oneshot::Sender<Result<(), WorkerError>>,
139    },
140
141    /// Handle a [`ChainInfoQuery`].
142    HandleChainInfoQuery {
143        query: ChainInfoQuery,
144        #[debug(skip)]
145        callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
146    },
147
148    /// Get a blob if it belongs to the current locking block or pending proposal.
149    DownloadPendingBlob {
150        blob_id: BlobId,
151        #[debug(skip)]
152        callback: oneshot::Sender<Result<Blob, WorkerError>>,
153    },
154
155    /// Handle a blob that belongs to a pending proposal or validated block certificate.
156    HandlePendingBlob {
157        blob: Blob,
158        #[debug(skip)]
159        callback: oneshot::Sender<Result<ChainInfoResponse, WorkerError>>,
160    },
161
162    /// Update the received certificate trackers to at least the given values.
163    UpdateReceivedCertificateTrackers {
164        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
165        callback: oneshot::Sender<Result<(), WorkerError>>,
166    },
167}
168
169/// The actor worker type.
170pub struct ChainWorkerActor<StorageClient>
171where
172    StorageClient: Storage + Clone + Send + Sync + 'static,
173{
174    chain_id: ChainId,
175    config: ChainWorkerConfig,
176    storage: StorageClient,
177    block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
178    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
179    tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
180    delivery_notifier: DeliveryNotifier,
181}
182
183impl<StorageClient> ChainWorkerActor<StorageClient>
184where
185    StorageClient: Storage + Clone + Send + Sync + 'static,
186{
187    /// Runs the [`ChainWorkerActor`], first by loading the chain state from `storage` then
188    /// handling all `incoming_requests` as they arrive.
189    ///
190    /// If loading the chain state fails, the next request will receive the error reported by the
191    /// `storage`, and the actor will then try again to load the state.
192    #[expect(clippy::too_many_arguments)]
193    pub async fn run(
194        config: ChainWorkerConfig,
195        storage: StorageClient,
196        block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
197        execution_state_cache: Arc<
198            ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
199        >,
200        tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
201        delivery_notifier: DeliveryNotifier,
202        chain_id: ChainId,
203        mut incoming_requests: mpsc::UnboundedReceiver<(
204            ChainWorkerRequest<StorageClient::Context>,
205            tracing::Span,
206        )>,
207    ) {
208        let actor = loop {
209            let load_result = Self::load(
210                config.clone(),
211                storage.clone(),
212                block_cache.clone(),
213                execution_state_cache.clone(),
214                tracked_chains.clone(),
215                delivery_notifier.clone(),
216                chain_id,
217            )
218            .await
219            .inspect_err(|error| warn!("Failed to load chain state: {error:?}"));
220
221            match load_result {
222                Ok(actor) => break actor,
223                Err(error) => match incoming_requests.recv().await {
224                    Some((request, _span)) => request.send_error(error),
225                    None => return,
226                },
227            }
228        };
229
230        if let Err(err) = actor.handle_requests(incoming_requests).await {
231            tracing::error!("Chain actor error: {err}");
232        }
233    }
234
235    /// Creates a [`ChainWorkerActor`], loading it with the chain state for the requested
236    /// [`ChainId`].
237    pub async fn load(
238        config: ChainWorkerConfig,
239        storage: StorageClient,
240        block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
241        execution_state_cache: Arc<
242            ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
243        >,
244        tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
245        delivery_notifier: DeliveryNotifier,
246        chain_id: ChainId,
247    ) -> Result<Self, WorkerError> {
248        Ok(ChainWorkerActor {
249            config,
250            storage,
251            block_values,
252            execution_state_cache,
253            tracked_chains,
254            delivery_notifier,
255            chain_id,
256        })
257    }
258
259    /// Spawns a blocking task to execute the service runtime actor.
260    ///
261    /// Returns the task handle and the endpoints to interact with the actor.
262    async fn spawn_service_runtime_actor(
263        chain_id: ChainId,
264    ) -> (linera_base::task::Blocking, ServiceRuntimeEndpoint) {
265        let context = QueryContext {
266            chain_id,
267            next_block_height: BlockHeight(0),
268            local_time: Timestamp::from(0),
269        };
270
271        let (execution_state_sender, incoming_execution_requests) =
272            futures::channel::mpsc::unbounded();
273        let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel();
274
275        let service_runtime_thread = linera_base::task::Blocking::spawn(move |_| async move {
276            ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver)
277        })
278        .await;
279
280        let endpoint = ServiceRuntimeEndpoint {
281            incoming_execution_requests,
282            runtime_request_sender,
283        };
284        (service_runtime_thread, endpoint)
285    }
286
287    /// Sleeps for the configured TTL.
288    pub(super) async fn sleep_until_timeout(&self) {
289        let now = self.storage.clock().current_time();
290        let ttl =
291            TimeDelta::from_micros(u64::try_from(self.config.ttl.as_micros()).unwrap_or(u64::MAX));
292        let timeout = now.saturating_add(ttl);
293        self.storage.clock().sleep_until(timeout).await
294    }
295
296    /// Runs the worker until there are no more incoming requests.
297    #[instrument(
298        skip_all,
299        fields(chain_id = format!("{:.8}", self.chain_id)),
300    )]
301    pub async fn handle_requests(
302        self,
303        mut incoming_requests: mpsc::UnboundedReceiver<(
304            ChainWorkerRequest<StorageClient::Context>,
305            tracing::Span,
306        )>,
307    ) -> Result<(), WorkerError> {
308        trace!("Starting `ChainWorkerActor`");
309
310        while let Some((request, span)) = incoming_requests.recv().await {
311            let (service_runtime_thread, service_runtime_endpoint) = {
312                if self.config.long_lived_services {
313                    let (thread, endpoint) = Self::spawn_service_runtime_actor(self.chain_id).await;
314                    (Some(thread), Some(endpoint))
315                } else {
316                    (None, None)
317                }
318            };
319
320            let mut worker = ChainWorkerState::load(
321                self.config.clone(),
322                self.storage.clone(),
323                self.block_values.clone(),
324                self.execution_state_cache.clone(),
325                self.tracked_chains.clone(),
326                self.delivery_notifier.clone(),
327                self.chain_id,
328                service_runtime_endpoint,
329            )
330            .await?;
331
332            Box::pin(worker.handle_request(request).instrument(span)).await;
333
334            loop {
335                futures::select! {
336                    () = self.sleep_until_timeout().fuse() => break,
337                    maybe_request = incoming_requests.recv().fuse() => {
338                        let Some((request, span)) = maybe_request else {
339                            break; // Request sender was dropped.
340                        };
341                        Box::pin(worker.handle_request(request).instrument(span)).await;
342                    }
343                }
344            }
345
346            worker.clear_shared_chain_view().await;
347            drop(worker);
348            if let Some(thread) = service_runtime_thread {
349                thread.join().await
350            }
351        }
352
353        trace!("`ChainWorkerActor` finished");
354        Ok(())
355    }
356}
357
358impl<Context> ChainWorkerRequest<Context>
359where
360    Context: linera_views::context::Context + Clone + Send + Sync + 'static,
361{
362    /// Responds to this request with an `error`.
363    pub fn send_error(self, error: WorkerError) {
364        debug!("Immediately sending error to chain worker request {self:?}");
365
366        let responded = match self {
367            #[cfg(with_testing)]
368            ChainWorkerRequest::ReadCertificate { callback, .. } => {
369                callback.send(Err(error)).is_ok()
370            }
371            #[cfg(with_testing)]
372            ChainWorkerRequest::FindBundleInInbox { callback, .. } => {
373                callback.send(Err(error)).is_ok()
374            }
375            ChainWorkerRequest::GetChainStateView { callback } => callback.send(Err(error)).is_ok(),
376            ChainWorkerRequest::QueryApplication { callback, .. } => {
377                callback.send(Err(error)).is_ok()
378            }
379            ChainWorkerRequest::DescribeApplication { callback, .. } => {
380                callback.send(Err(error)).is_ok()
381            }
382            ChainWorkerRequest::StageBlockExecution { callback, .. } => {
383                callback.send(Err(error)).is_ok()
384            }
385            ChainWorkerRequest::ProcessTimeout { callback, .. } => {
386                callback.send(Err(error)).is_ok()
387            }
388            ChainWorkerRequest::HandleBlockProposal { callback, .. } => {
389                callback.send(Err(error)).is_ok()
390            }
391            ChainWorkerRequest::ProcessValidatedBlock { callback, .. } => {
392                callback.send(Err(error)).is_ok()
393            }
394            ChainWorkerRequest::ProcessConfirmedBlock { callback, .. } => {
395                callback.send(Err(error)).is_ok()
396            }
397            ChainWorkerRequest::ProcessCrossChainUpdate { callback, .. } => {
398                callback.send(Err(error)).is_ok()
399            }
400            ChainWorkerRequest::ConfirmUpdatedRecipient { callback, .. } => {
401                callback.send(Err(error)).is_ok()
402            }
403            ChainWorkerRequest::HandleChainInfoQuery { callback, .. } => {
404                callback.send(Err(error)).is_ok()
405            }
406            ChainWorkerRequest::DownloadPendingBlob { callback, .. } => {
407                callback.send(Err(error)).is_ok()
408            }
409            ChainWorkerRequest::HandlePendingBlob { callback, .. } => {
410                callback.send(Err(error)).is_ok()
411            }
412            ChainWorkerRequest::UpdateReceivedCertificateTrackers { callback, .. } => {
413                callback.send(Err(error)).is_ok()
414            }
415        };
416
417        if !responded {
418            warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
419        }
420    }
421}
422
423/// Writes an option as `Some(..)` or `None`.
424fn elide_option<T>(option: &Option<T>, f: &mut fmt::Formatter) -> fmt::Result {
425    match option {
426        Some(_) => write!(f, "Some(..)"),
427        None => write!(f, "None"),
428    }
429}