linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, HashSet, VecDeque},
7    num::NonZeroUsize,
8    sync::{Arc, Mutex, RwLock},
9    time::Duration,
10};
11
12use futures::future::Either;
13use linera_base::{
14    crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
15    data_types::{
16        ApplicationDescription, ArithmeticError, Blob, BlockHeight, DecompressionError, Epoch,
17        Round,
18    },
19    doc_scalar,
20    hashed::Hashed,
21    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId},
22    time::timer::{sleep, timeout},
23};
24#[cfg(with_testing)]
25use linera_chain::ChainExecutionContext;
26use linera_chain::{
27    data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
28    types::{
29        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
30        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
31    },
32    ChainError, ChainStateView,
33};
34use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
35use linera_storage::Storage;
36use linera_views::ViewError;
37use lru::LruCache;
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
41use tracing::{error, instrument, trace, warn};
42
43use crate::{
44    chain_worker::{ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier},
45    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
46    join_set_ext::{JoinSet, JoinSetExt},
47    notifier::Notifier,
48    value_cache::ValueCache,
49};
50
51#[cfg(test)]
52#[path = "unit_tests/worker_tests.rs"]
53mod worker_tests;
54
55#[cfg(with_metrics)]
56mod metrics {
57    use std::sync::LazyLock;
58
59    use linera_base::prometheus_util::{
60        exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
61    };
62    use prometheus::{HistogramVec, IntCounterVec};
63
64    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
65        register_histogram_vec(
66            "num_rounds_in_certificate",
67            "Number of rounds in certificate",
68            &["certificate_value", "round_type"],
69            exponential_bucket_interval(0.1, 50.0),
70        )
71    });
72
73    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
74        register_histogram_vec(
75            "num_rounds_in_block_proposal",
76            "Number of rounds in block proposal",
77            &["round_type"],
78            exponential_bucket_interval(0.1, 50.0),
79        )
80    });
81
82    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
83        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
84
85    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
86        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
87    });
88
89    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
90        register_int_counter_vec(
91            "certificates_signed",
92            "Number of confirmed block certificates signed by each validator",
93            &["validator_name"],
94        )
95    });
96}
97
98/// Instruct the networking layer to send cross-chain requests and/or push notifications.
99#[derive(Default, Debug)]
100pub struct NetworkActions {
101    /// The cross-chain requests
102    pub cross_chain_requests: Vec<CrossChainRequest>,
103    /// The push notifications.
104    pub notifications: Vec<Notification>,
105}
106
107impl NetworkActions {
108    pub fn extend(&mut self, other: NetworkActions) {
109        self.cross_chain_requests.extend(other.cross_chain_requests);
110        self.notifications.extend(other.notifications);
111    }
112}
113
114#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
115/// Notification that a chain has a new certified block or a new message.
116pub struct Notification {
117    pub chain_id: ChainId,
118    pub reason: Reason,
119}
120
121doc_scalar!(
122    Notification,
123    "Notify that a chain has a new certified block or a new message"
124);
125
126#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
127/// Reason for the notification.
128pub enum Reason {
129    NewBlock {
130        height: BlockHeight,
131        hash: CryptoHash,
132    },
133    NewIncomingBundle {
134        origin: ChainId,
135        height: BlockHeight,
136    },
137    NewRound {
138        height: BlockHeight,
139        round: Round,
140    },
141}
142
143/// Error type for worker operations.
144#[derive(Debug, Error)]
145pub enum WorkerError {
146    #[error(transparent)]
147    CryptoError(#[from] CryptoError),
148
149    #[error(transparent)]
150    ArithmeticError(#[from] ArithmeticError),
151
152    #[error(transparent)]
153    ViewError(#[from] ViewError),
154
155    #[error(transparent)]
156    ChainError(#[from] Box<ChainError>),
157
158    // Chain access control
159    #[error("Block was not signed by an authorized owner")]
160    InvalidOwner,
161
162    #[error("Operations in the block are not authenticated by the proper signer: {0}")]
163    InvalidSigner(AccountOwner),
164
165    // Chaining
166    #[error(
167        "Was expecting block height {expected_block_height} but found {found_block_height} instead"
168    )]
169    UnexpectedBlockHeight {
170        expected_block_height: BlockHeight,
171        found_block_height: BlockHeight,
172    },
173    #[error("Cannot confirm a block before its predecessors: {current_block_height:?}")]
174    MissingEarlierBlocks { current_block_height: BlockHeight },
175    #[error("Unexpected epoch {epoch:}: chain {chain_id:} is at {chain_epoch:}")]
176    InvalidEpoch {
177        chain_id: ChainId,
178        chain_epoch: Epoch,
179        epoch: Epoch,
180    },
181
182    // Other server-side errors
183    #[error("Invalid cross-chain request")]
184    InvalidCrossChainRequest,
185    #[error("The block does not contain the hash that we expected for the previous block")]
186    InvalidBlockChaining,
187    #[error(
188        "The given outcome is not what we computed after executing the block.\n\
189        Computed: {computed:#?}\n\
190        Submitted: {submitted:#?}"
191    )]
192    IncorrectOutcome {
193        computed: Box<BlockExecutionOutcome>,
194        submitted: Box<BlockExecutionOutcome>,
195    },
196    #[error("The timestamp of a Tick operation is in the future.")]
197    InvalidTimestamp,
198    #[error("We don't have the value for the certificate.")]
199    MissingCertificateValue,
200    #[error("The hash certificate doesn't match its value.")]
201    InvalidLiteCertificate,
202    #[error("Fast blocks cannot query oracles")]
203    FastBlockUsingOracles,
204    #[error("Blobs not found: {0:?}")]
205    BlobsNotFound(Vec<BlobId>),
206    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
207    ConfirmedLogEntryNotFound {
208        height: BlockHeight,
209        chain_id: ChainId,
210    },
211    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
212    PreprocessedBlocksEntryNotFound {
213        height: BlockHeight,
214        chain_id: ChainId,
215    },
216    #[error("The block proposal is invalid: {0}")]
217    InvalidBlockProposal(String),
218    #[error("The worker is too busy to handle new chains")]
219    FullChainWorkerCache,
220    #[error("Failed to join spawned worker task")]
221    JoinError,
222    #[error("Blob was not required by any pending block")]
223    UnexpectedBlob,
224    #[error("Number of published blobs per block must not exceed {0}")]
225    TooManyPublishedBlobs(u64),
226    #[error(transparent)]
227    Decompression(#[from] DecompressionError),
228}
229
230impl From<ChainError> for WorkerError {
231    #[instrument(level = "trace", skip(chain_error))]
232    fn from(chain_error: ChainError) -> Self {
233        match chain_error {
234            ChainError::ExecutionError(execution_error, context) => {
235                if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
236                    Self::BlobsNotFound(blob_ids)
237                } else {
238                    Self::ChainError(Box::new(ChainError::ExecutionError(
239                        execution_error,
240                        context,
241                    )))
242                }
243            }
244            error => Self::ChainError(Box::new(error)),
245        }
246    }
247}
248
249#[cfg(with_testing)]
250impl WorkerError {
251    /// Returns the inner [`ExecutionError`] in this error.
252    ///
253    /// # Panics
254    ///
255    /// If this is not caused by an [`ExecutionError`].
256    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
257        let WorkerError::ChainError(chain_error) = self else {
258            panic!("Expected an `ExecutionError`. Got: {self:#?}");
259        };
260
261        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
262            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
263        };
264
265        assert_eq!(context, expected_context);
266
267        *execution_error
268    }
269}
270
271/// State of a worker in a validator or a local node.
272pub struct WorkerState<StorageClient>
273where
274    StorageClient: Storage,
275{
276    /// A name used for logging
277    nickname: String,
278    /// Access to local persistent storage.
279    storage: StorageClient,
280    /// Configuration options for the [`ChainWorker`]s.
281    chain_worker_config: ChainWorkerConfig,
282    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
283    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
284    /// Chain IDs that should be tracked by a worker.
285    tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
286    /// One-shot channels to notify callers when messages of a particular chain have been
287    /// delivered.
288    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
289    /// The set of spawned [`ChainWorkerActor`] tasks.
290    chain_worker_tasks: Arc<Mutex<JoinSet>>,
291    /// The cache of running [`ChainWorkerActor`]s.
292    chain_workers: Arc<Mutex<LruCache<ChainId, ChainActorEndpoint<StorageClient>>>>,
293}
294
295impl<StorageClient> Clone for WorkerState<StorageClient>
296where
297    StorageClient: Storage + Clone,
298{
299    fn clone(&self) -> Self {
300        WorkerState {
301            nickname: self.nickname.clone(),
302            storage: self.storage.clone(),
303            chain_worker_config: self.chain_worker_config.clone(),
304            block_cache: self.block_cache.clone(),
305            execution_state_cache: self.execution_state_cache.clone(),
306            tracked_chains: self.tracked_chains.clone(),
307            delivery_notifiers: self.delivery_notifiers.clone(),
308            chain_worker_tasks: self.chain_worker_tasks.clone(),
309            chain_workers: self.chain_workers.clone(),
310        }
311    }
312}
313
314/// The sender endpoint for [`ChainWorkerRequest`]s.
315type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
316    ChainWorkerRequest<<StorageClient as Storage>::Context>,
317    tracing::Span,
318)>;
319
320pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
321
322impl<StorageClient> WorkerState<StorageClient>
323where
324    StorageClient: Storage,
325{
326    #[instrument(level = "trace", skip(nickname, key_pair, storage))]
327    pub fn new(
328        nickname: String,
329        key_pair: Option<ValidatorSecretKey>,
330        storage: StorageClient,
331        chain_worker_limit: NonZeroUsize,
332    ) -> Self {
333        WorkerState {
334            nickname,
335            storage,
336            chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
337            block_cache: Arc::new(ValueCache::default()),
338            execution_state_cache: Arc::new(ValueCache::default()),
339            tracked_chains: None,
340            delivery_notifiers: Arc::default(),
341            chain_worker_tasks: Arc::default(),
342            chain_workers: Arc::new(Mutex::new(LruCache::new(chain_worker_limit))),
343        }
344    }
345
346    #[instrument(level = "trace", skip(nickname, storage))]
347    pub fn new_for_client(
348        nickname: String,
349        storage: StorageClient,
350        tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
351        chain_worker_limit: NonZeroUsize,
352    ) -> Self {
353        WorkerState {
354            nickname,
355            storage,
356            chain_worker_config: ChainWorkerConfig::default(),
357            block_cache: Arc::new(ValueCache::default()),
358            execution_state_cache: Arc::new(ValueCache::default()),
359            tracked_chains: Some(tracked_chains),
360            delivery_notifiers: Arc::default(),
361            chain_worker_tasks: Arc::default(),
362            chain_workers: Arc::new(Mutex::new(LruCache::new(chain_worker_limit))),
363        }
364    }
365
366    #[instrument(level = "trace", skip(self, value))]
367    pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
368        self.chain_worker_config.allow_inactive_chains = value;
369        self
370    }
371
372    #[instrument(level = "trace", skip(self, value))]
373    pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
374        self.chain_worker_config
375            .allow_messages_from_deprecated_epochs = value;
376        self
377    }
378
379    #[instrument(level = "trace", skip(self, value))]
380    pub fn with_long_lived_services(mut self, value: bool) -> Self {
381        self.chain_worker_config.long_lived_services = value;
382        self
383    }
384
385    /// Returns an instance with the specified grace period, in microseconds.
386    ///
387    /// Blocks with a timestamp this far in the future will still be accepted, but the validator
388    /// will wait until that timestamp before voting.
389    #[instrument(level = "trace", skip(self, grace_period))]
390    pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
391        self.chain_worker_config.grace_period = grace_period;
392        self
393    }
394
395    #[instrument(level = "trace", skip(self))]
396    pub fn nickname(&self) -> &str {
397        &self.nickname
398    }
399
400    /// Returns the storage client so that it can be manipulated or queried.
401    #[instrument(level = "trace", skip(self))]
402    #[cfg(not(feature = "test"))]
403    pub(crate) fn storage_client(&self) -> &StorageClient {
404        &self.storage
405    }
406
407    /// Returns the storage client so that it can be manipulated or queried by tests in other
408    /// crates.
409    #[instrument(level = "trace", skip(self))]
410    #[cfg(feature = "test")]
411    pub fn storage_client(&self) -> &StorageClient {
412        &self.storage
413    }
414
415    #[instrument(level = "trace", skip(self, key_pair))]
416    #[cfg(test)]
417    pub(crate) async fn with_key_pair(mut self, key_pair: Option<Arc<ValidatorSecretKey>>) -> Self {
418        self.chain_worker_config.key_pair = key_pair;
419        self.chain_workers.lock().unwrap().clear();
420        self
421    }
422
423    #[instrument(level = "trace", skip(self, certificate))]
424    pub(crate) async fn full_certificate(
425        &self,
426        certificate: LiteCertificate<'_>,
427    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
428        let block = self
429            .block_cache
430            .get(&certificate.value.value_hash)
431            .ok_or(WorkerError::MissingCertificateValue)?;
432
433        match certificate.value.kind {
434            linera_chain::types::CertificateKind::Confirmed => {
435                let value = ConfirmedBlock::from_hashed(block);
436                Ok(Either::Left(
437                    certificate
438                        .with_value(value)
439                        .ok_or(WorkerError::InvalidLiteCertificate)?,
440                ))
441            }
442            linera_chain::types::CertificateKind::Validated => {
443                let value = ValidatedBlock::from_hashed(block);
444                Ok(Either::Right(
445                    certificate
446                        .with_value(value)
447                        .ok_or(WorkerError::InvalidLiteCertificate)?,
448                ))
449            }
450            _ => return Err(WorkerError::InvalidLiteCertificate),
451        }
452    }
453}
454
455#[allow(async_fn_in_trait)]
456#[cfg_attr(not(web), trait_variant::make(Send))]
457pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
458    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
459        worker: &WorkerState<S>,
460        certificate: GenericCertificate<Self>,
461    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
462}
463
464impl ProcessableCertificate for ConfirmedBlock {
465    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
466        worker: &WorkerState<S>,
467        certificate: ConfirmedBlockCertificate,
468    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
469        worker.handle_confirmed_certificate(certificate, None).await
470    }
471}
472
473impl ProcessableCertificate for ValidatedBlock {
474    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
475        worker: &WorkerState<S>,
476        certificate: ValidatedBlockCertificate,
477    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
478        worker.handle_validated_certificate(certificate).await
479    }
480}
481
482impl ProcessableCertificate for Timeout {
483    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
484        worker: &WorkerState<S>,
485        certificate: TimeoutCertificate,
486    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
487        worker.handle_timeout_certificate(certificate).await
488    }
489}
490
491impl<StorageClient> WorkerState<StorageClient>
492where
493    StorageClient: Storage + Clone + Send + Sync + 'static,
494{
495    #[instrument(level = "trace", skip(self, certificate, notifier))]
496    #[inline]
497    pub async fn fully_handle_certificate_with_notifications<T>(
498        &self,
499        certificate: GenericCertificate<T>,
500        notifier: &impl Notifier,
501    ) -> Result<ChainInfoResponse, WorkerError>
502    where
503        T: ProcessableCertificate,
504    {
505        let notifications = (*notifier).clone();
506        let this = self.clone();
507        linera_base::task::spawn(async move {
508            let (response, actions) =
509                ProcessableCertificate::process_certificate(&this, certificate).await?;
510            notifications.notify(&actions.notifications);
511            let mut requests = VecDeque::from(actions.cross_chain_requests);
512            while let Some(request) = requests.pop_front() {
513                let actions = this.handle_cross_chain_request(request).await?;
514                requests.extend(actions.cross_chain_requests);
515                notifications.notify(&actions.notifications);
516            }
517            Ok(response)
518        })
519        .await
520        .unwrap_or_else(|_| Err(WorkerError::JoinError))
521    }
522
523    /// Tries to execute a block proposal without any verification other than block execution.
524    #[instrument(level = "trace", skip(self, block))]
525    pub async fn stage_block_execution(
526        &self,
527        block: ProposedBlock,
528        round: Option<u32>,
529        published_blobs: Vec<Blob>,
530    ) -> Result<(Block, ChainInfoResponse), WorkerError> {
531        self.query_chain_worker(block.chain_id, move |callback| {
532            ChainWorkerRequest::StageBlockExecution {
533                block,
534                round,
535                published_blobs,
536                callback,
537            }
538        })
539        .await
540    }
541
542    /// Executes a [`Query`] for an application's state on a specific chain.
543    #[instrument(level = "trace", skip(self, chain_id, query))]
544    pub async fn query_application(
545        &self,
546        chain_id: ChainId,
547        query: Query,
548    ) -> Result<QueryOutcome, WorkerError> {
549        self.query_chain_worker(chain_id, move |callback| {
550            ChainWorkerRequest::QueryApplication { query, callback }
551        })
552        .await
553    }
554
555    #[instrument(level = "trace", skip(self, chain_id, application_id))]
556    pub async fn describe_application(
557        &self,
558        chain_id: ChainId,
559        application_id: ApplicationId,
560    ) -> Result<ApplicationDescription, WorkerError> {
561        self.query_chain_worker(chain_id, move |callback| {
562            ChainWorkerRequest::DescribeApplication {
563                application_id,
564                callback,
565            }
566        })
567        .await
568    }
569
570    /// Processes a confirmed block (aka a commit).
571    #[instrument(
572        level = "trace",
573        skip(self, certificate, notify_when_messages_are_delivered)
574    )]
575    async fn process_confirmed_block(
576        &self,
577        certificate: ConfirmedBlockCertificate,
578        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
579    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
580        let chain_id = certificate.block().header.chain_id;
581
582        let (response, actions) = self
583            .query_chain_worker(chain_id, move |callback| {
584                ChainWorkerRequest::ProcessConfirmedBlock {
585                    certificate,
586                    notify_when_messages_are_delivered,
587                    callback,
588                }
589            })
590            .await?;
591
592        #[cfg(with_metrics)]
593        metrics::NUM_BLOCKS.with_label_values(&[]).inc();
594
595        Ok((response, actions))
596    }
597
598    /// Processes a validated block issued from a multi-owner chain.
599    #[instrument(level = "trace", skip(self, certificate))]
600    async fn process_validated_block(
601        &self,
602        certificate: ValidatedBlockCertificate,
603    ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
604        let chain_id = certificate.block().header.chain_id;
605
606        self.query_chain_worker(chain_id, move |callback| {
607            ChainWorkerRequest::ProcessValidatedBlock {
608                certificate,
609                callback,
610            }
611        })
612        .await
613    }
614
615    /// Processes a leader timeout issued from a multi-owner chain.
616    #[instrument(level = "trace", skip(self, certificate))]
617    async fn process_timeout(
618        &self,
619        certificate: TimeoutCertificate,
620    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
621        let chain_id = certificate.value().chain_id();
622        self.query_chain_worker(chain_id, move |callback| {
623            ChainWorkerRequest::ProcessTimeout {
624                certificate,
625                callback,
626            }
627        })
628        .await
629    }
630
631    #[instrument(level = "trace", skip(self, origin, recipient, bundles))]
632    async fn process_cross_chain_update(
633        &self,
634        origin: ChainId,
635        recipient: ChainId,
636        bundles: Vec<(Epoch, MessageBundle)>,
637    ) -> Result<Option<BlockHeight>, WorkerError> {
638        self.query_chain_worker(recipient, move |callback| {
639            ChainWorkerRequest::ProcessCrossChainUpdate {
640                origin,
641                bundles,
642                callback,
643            }
644        })
645        .await
646    }
647
648    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
649    #[instrument(level = "trace", skip(self, chain_id, height))]
650    #[cfg(with_testing)]
651    pub async fn read_certificate(
652        &self,
653        chain_id: ChainId,
654        height: BlockHeight,
655    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
656        self.query_chain_worker(chain_id, move |callback| {
657            ChainWorkerRequest::ReadCertificate { height, callback }
658        })
659        .await
660    }
661
662    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
663    /// [`ChainId`].
664    ///
665    /// The returned view holds a lock on the chain state, which prevents the worker from changing
666    /// the state of that chain.
667    #[instrument(level = "trace", skip(self))]
668    pub async fn chain_state_view(
669        &self,
670        chain_id: ChainId,
671    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
672        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
673            callback,
674        })
675        .await
676    }
677
678    #[instrument(level = "trace", skip(self, request_builder))]
679    /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
680    async fn query_chain_worker<Response>(
681        &self,
682        chain_id: ChainId,
683        request_builder: impl FnOnce(
684            oneshot::Sender<Result<Response, WorkerError>>,
685        ) -> ChainWorkerRequest<StorageClient::Context>,
686    ) -> Result<Response, WorkerError> {
687        let chain_actor = self.get_chain_worker_endpoint(chain_id).await?;
688        let (callback, response) = oneshot::channel();
689
690        chain_actor
691            .send((request_builder(callback), tracing::Span::current()))
692            .expect("`ChainWorkerActor` stopped executing unexpectedly");
693
694        response
695            .await
696            .expect("`ChainWorkerActor` stopped executing without responding")
697    }
698
699    /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, creating one and adding it
700    /// to the cache if needed.
701    #[instrument(level = "trace", skip(self))]
702    async fn get_chain_worker_endpoint(
703        &self,
704        chain_id: ChainId,
705    ) -> Result<ChainActorEndpoint<StorageClient>, WorkerError> {
706        let (sender, new_receiver) = timeout(Duration::from_secs(3), async move {
707            loop {
708                match self.try_get_chain_worker_endpoint(chain_id) {
709                    Some(endpoint) => break endpoint,
710                    None => sleep(Duration::from_millis(250)).await,
711                }
712                warn!("No chain worker candidates found for eviction, retrying...");
713            }
714        })
715        .await
716        .map_err(|_| WorkerError::FullChainWorkerCache)?;
717
718        if let Some(receiver) = new_receiver {
719            let delivery_notifier = self
720                .delivery_notifiers
721                .lock()
722                .unwrap()
723                .entry(chain_id)
724                .or_default()
725                .clone();
726
727            let actor_task = ChainWorkerActor::run(
728                self.chain_worker_config.clone(),
729                self.storage.clone(),
730                self.block_cache.clone(),
731                self.execution_state_cache.clone(),
732                self.tracked_chains.clone(),
733                delivery_notifier,
734                chain_id,
735                receiver,
736            );
737
738            self.chain_worker_tasks
739                .lock()
740                .unwrap()
741                .spawn_task(actor_task);
742        }
743
744        Ok(sender)
745    }
746
747    /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, attempting to create one
748    /// and add it to the cache if needed.
749    ///
750    /// Returns [`None`] if the cache is full and no candidate for eviction was found.
751    #[instrument(level = "trace", skip(self))]
752    #[expect(clippy::type_complexity)]
753    fn try_get_chain_worker_endpoint(
754        &self,
755        chain_id: ChainId,
756    ) -> Option<(
757        ChainActorEndpoint<StorageClient>,
758        Option<
759            mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
760        >,
761    )> {
762        let mut chain_workers = self.chain_workers.lock().unwrap();
763
764        if let Some(endpoint) = chain_workers.get(&chain_id) {
765            Some((endpoint.clone(), None))
766        } else {
767            if chain_workers.len() >= usize::from(chain_workers.cap()) {
768                let (chain_to_evict, _) = chain_workers
769                    .iter()
770                    .rev()
771                    .find(|(_, candidate_endpoint)| candidate_endpoint.strong_count() <= 1)?;
772                let chain_to_evict = *chain_to_evict;
773
774                chain_workers.pop(&chain_to_evict);
775                self.clean_up_finished_chain_workers(&chain_workers);
776            }
777
778            let (sender, receiver) = mpsc::unbounded_channel();
779            chain_workers.push(chain_id, sender.clone());
780
781            Some((sender, Some(receiver)))
782        }
783    }
784
785    /// Cleans up any finished chain workers and their delivery notifiers.
786    fn clean_up_finished_chain_workers(
787        &self,
788        active_chain_workers: &LruCache<ChainId, ChainActorEndpoint<StorageClient>>,
789    ) {
790        self.chain_worker_tasks
791            .lock()
792            .unwrap()
793            .reap_finished_tasks();
794
795        self.delivery_notifiers
796            .lock()
797            .unwrap()
798            .retain(|chain_id, notifier| {
799                !notifier.is_empty() || active_chain_workers.contains(chain_id)
800            });
801    }
802
803    #[instrument(skip_all, fields(
804        nick = self.nickname,
805        chain_id = format!("{:.8}", proposal.content.block.chain_id),
806        height = %proposal.content.block.height,
807    ))]
808    pub async fn handle_block_proposal(
809        &self,
810        proposal: BlockProposal,
811    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
812        trace!("{} <-- {:?}", self.nickname, proposal);
813        #[cfg(with_metrics)]
814        let round = proposal.content.round;
815        let response = self
816            .query_chain_worker(proposal.content.block.chain_id, move |callback| {
817                ChainWorkerRequest::HandleBlockProposal { proposal, callback }
818            })
819            .await?;
820        #[cfg(with_metrics)]
821        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
822            .with_label_values(&[round.type_name()])
823            .observe(round.number() as f64);
824        Ok(response)
825    }
826
827    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
828    // Other fields will be included in handle_certificate's span.
829    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
830    pub async fn handle_lite_certificate<'a>(
831        &self,
832        certificate: LiteCertificate<'a>,
833        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
834    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
835        match self.full_certificate(certificate).await? {
836            Either::Left(confirmed) => {
837                self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
838                    .await
839            }
840            Either::Right(validated) => {
841                if let Some(notifier) = notify_when_messages_are_delivered {
842                    // Nothing to wait for.
843                    if let Err(()) = notifier.send(()) {
844                        warn!("Failed to notify message delivery to caller");
845                    }
846                }
847                self.handle_validated_certificate(validated).await
848            }
849        }
850    }
851
852    /// Processes a confirmed block certificate.
853    #[instrument(skip_all, fields(
854        nick = self.nickname,
855        chain_id = format!("{:.8}", certificate.block().header.chain_id),
856        height = %certificate.block().header.height,
857    ))]
858    pub async fn handle_confirmed_certificate(
859        &self,
860        certificate: ConfirmedBlockCertificate,
861        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
862    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
863        trace!("{} <-- {:?}", self.nickname, certificate);
864        #[cfg(with_metrics)]
865        {
866            let confirmed_transactions = (certificate.block().body.incoming_bundles.len()
867                + certificate.block().body.operations.len())
868                as u64;
869
870            metrics::NUM_ROUNDS_IN_CERTIFICATE
871                .with_label_values(&[
872                    certificate.inner().to_log_str(),
873                    certificate.round.type_name(),
874                ])
875                .observe(certificate.round.number() as f64);
876            if confirmed_transactions > 0 {
877                metrics::TRANSACTION_COUNT
878                    .with_label_values(&[])
879                    .inc_by(confirmed_transactions);
880            }
881
882            for (validator_name, _) in certificate.signatures() {
883                metrics::CERTIFICATES_SIGNED
884                    .with_label_values(&[&validator_name.to_string()])
885                    .inc();
886            }
887        }
888
889        self.process_confirmed_block(certificate, notify_when_messages_are_delivered)
890            .await
891    }
892
893    /// Preprocesses a block without executing it. This does not update the execution state, but
894    /// can create cross-chain messages and store blobs. It also does _not_ check the signatures;
895    /// the caller is responsible for checking them using the correct committee.
896    #[instrument(skip_all, fields(
897        nick = self.nickname,
898        chain_id = format!("{:.8}", certificate.block().header.chain_id),
899        height = %certificate.block().header.height,
900    ))]
901    pub async fn fully_preprocess_certificate_with_notifications(
902        &self,
903        certificate: ConfirmedBlockCertificate,
904        notifier: &impl Notifier,
905    ) -> Result<(), WorkerError> {
906        trace!("{} <-- {:?} (preprocess)", self.nickname, certificate);
907
908        let notifications = (*notifier).clone();
909        let this = self.clone();
910        linera_base::task::spawn(async move {
911            let actions = this
912                .query_chain_worker(certificate.block().header.chain_id, move |callback| {
913                    ChainWorkerRequest::PreprocessCertificate {
914                        certificate,
915                        callback,
916                    }
917                })
918                .await?;
919            notifications.notify(&actions.notifications);
920            let mut requests = VecDeque::from(actions.cross_chain_requests);
921            while let Some(request) = requests.pop_front() {
922                let actions = this.handle_cross_chain_request(request).await?;
923                requests.extend(actions.cross_chain_requests);
924                notifications.notify(&actions.notifications);
925            }
926            Ok(())
927        })
928        .await
929        .unwrap_or_else(|_| Err(WorkerError::JoinError))
930    }
931
932    /// Processes a validated block certificate.
933    #[instrument(skip_all, fields(
934        nick = self.nickname,
935        chain_id = format!("{:.8}", certificate.block().header.chain_id),
936        height = %certificate.block().header.height,
937    ))]
938    pub async fn handle_validated_certificate(
939        &self,
940        certificate: ValidatedBlockCertificate,
941    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
942        trace!("{} <-- {:?}", self.nickname, certificate);
943
944        #[cfg(with_metrics)]
945        let round = certificate.round;
946        #[cfg(with_metrics)]
947        let cert_str = certificate.inner().to_log_str();
948
949        let (info, actions, _duplicated) = self.process_validated_block(certificate).await?;
950        #[cfg(with_metrics)]
951        {
952            if !_duplicated {
953                metrics::NUM_ROUNDS_IN_CERTIFICATE
954                    .with_label_values(&[cert_str, round.type_name()])
955                    .observe(round.number() as f64);
956            }
957        }
958        Ok((info, actions))
959    }
960
961    /// Processes a timeout certificate
962    #[instrument(skip_all, fields(
963        nick = self.nickname,
964        chain_id = format!("{:.8}", certificate.inner().chain_id()),
965        height = %certificate.inner().height(),
966    ))]
967    pub async fn handle_timeout_certificate(
968        &self,
969        certificate: TimeoutCertificate,
970    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
971        trace!("{} <-- {:?}", self.nickname, certificate);
972        self.process_timeout(certificate).await
973    }
974
975    #[instrument(skip_all, fields(
976        nick = self.nickname,
977        chain_id = format!("{:.8}", query.chain_id)
978    ))]
979    pub async fn handle_chain_info_query(
980        &self,
981        query: ChainInfoQuery,
982    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
983        trace!("{} <-- {:?}", self.nickname, query);
984        let result = self
985            .query_chain_worker(query.chain_id, move |callback| {
986                ChainWorkerRequest::HandleChainInfoQuery { query, callback }
987            })
988            .await;
989        trace!("{} --> {:?}", self.nickname, result);
990        result
991    }
992
993    #[instrument(skip_all, fields(
994        nick = self.nickname,
995        chain_id = format!("{:.8}", chain_id)
996    ))]
997    pub async fn download_pending_blob(
998        &self,
999        chain_id: ChainId,
1000        blob_id: BlobId,
1001    ) -> Result<Blob, WorkerError> {
1002        trace!(
1003            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1004            self.nickname
1005        );
1006        let result = self
1007            .query_chain_worker(chain_id, move |callback| {
1008                ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1009            })
1010            .await;
1011        trace!(
1012            "{} --> {:?}",
1013            self.nickname,
1014            result.as_ref().map(|_| blob_id)
1015        );
1016        result
1017    }
1018
1019    #[instrument(skip_all, fields(
1020        nick = self.nickname,
1021        chain_id = format!("{:.8}", chain_id)
1022    ))]
1023    pub async fn handle_pending_blob(
1024        &self,
1025        chain_id: ChainId,
1026        blob: Blob,
1027    ) -> Result<ChainInfoResponse, WorkerError> {
1028        let blob_id = blob.id();
1029        trace!(
1030            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1031            self.nickname
1032        );
1033        let result = self
1034            .query_chain_worker(chain_id, move |callback| {
1035                ChainWorkerRequest::HandlePendingBlob { blob, callback }
1036            })
1037            .await;
1038        trace!(
1039            "{} --> {:?}",
1040            self.nickname,
1041            result.as_ref().map(|_| blob_id)
1042        );
1043        result
1044    }
1045
1046    #[instrument(skip_all, fields(
1047        nick = self.nickname,
1048        chain_id = format!("{:.8}", request.target_chain_id())
1049    ))]
1050    pub async fn handle_cross_chain_request(
1051        &self,
1052        request: CrossChainRequest,
1053    ) -> Result<NetworkActions, WorkerError> {
1054        trace!("{} <-- {:?}", self.nickname, request);
1055        match request {
1056            CrossChainRequest::UpdateRecipient {
1057                sender,
1058                recipient,
1059                bundles,
1060            } => {
1061                let mut actions = NetworkActions::default();
1062                let origin = sender;
1063                let Some(height) = self
1064                    .process_cross_chain_update(origin, recipient, bundles)
1065                    .await?
1066                else {
1067                    return Ok(actions);
1068                };
1069                actions.notifications.push(Notification {
1070                    chain_id: recipient,
1071                    reason: Reason::NewIncomingBundle { origin, height },
1072                });
1073                actions
1074                    .cross_chain_requests
1075                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1076                        sender,
1077                        recipient,
1078                        latest_height: height,
1079                    });
1080                Ok(actions)
1081            }
1082            CrossChainRequest::ConfirmUpdatedRecipient {
1083                sender,
1084                recipient,
1085                latest_height,
1086            } => {
1087                self.query_chain_worker(sender, move |callback| {
1088                    ChainWorkerRequest::ConfirmUpdatedRecipient {
1089                        recipient,
1090                        latest_height,
1091                        callback,
1092                    }
1093                })
1094                .await?;
1095                Ok(NetworkActions::default())
1096            }
1097        }
1098    }
1099
1100    /// Updates the received certificate trackers to at least the given values.
1101    pub async fn update_received_certificate_trackers(
1102        &self,
1103        chain_id: ChainId,
1104        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1105    ) -> Result<(), WorkerError> {
1106        self.query_chain_worker(chain_id, move |callback| {
1107            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1108                new_trackers,
1109                callback,
1110            }
1111        })
1112        .await
1113    }
1114}
1115
1116#[cfg(with_testing)]
1117impl<StorageClient> WorkerState<StorageClient>
1118where
1119    StorageClient: Storage,
1120{
1121    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1122    ///
1123    /// # Panics
1124    ///
1125    /// If the validator doesn't have a key pair assigned to it.
1126    #[instrument(level = "trace", skip(self))]
1127    pub fn public_key(&self) -> ValidatorPublicKey {
1128        self.chain_worker_config
1129            .key_pair()
1130            .expect(
1131                "Test validator should have a key pair assigned to it \
1132                in order to obtain it's public key",
1133            )
1134            .public()
1135    }
1136}