linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14    data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round},
15    doc_scalar,
16    hashed::Hashed,
17    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
18    time::timer::{sleep, timeout},
19};
20#[cfg(with_testing)]
21use linera_chain::ChainExecutionContext;
22use linera_chain::{
23    data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24    types::{
25        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
26        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
27    },
28    ChainError, ChainStateView,
29};
30use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
31use linera_storage::Storage;
32use linera_views::{context::InactiveContext, ViewError};
33use serde::{Deserialize, Serialize};
34use thiserror::Error;
35use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
36use tracing::{error, instrument, trace, warn};
37
38use crate::{
39    chain_worker::{ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier},
40    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
41    join_set_ext::{JoinSet, JoinSetExt},
42    notifier::Notifier,
43    value_cache::ValueCache,
44};
45
46#[cfg(test)]
47#[path = "unit_tests/worker_tests.rs"]
48mod worker_tests;
49
50#[cfg(with_metrics)]
51mod metrics {
52    use std::sync::LazyLock;
53
54    use linera_base::prometheus_util::{
55        exponential_bucket_interval, register_histogram_vec, register_int_counter,
56        register_int_counter_vec,
57    };
58    use prometheus::{HistogramVec, IntCounter, IntCounterVec};
59
60    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
61        register_histogram_vec(
62            "num_rounds_in_certificate",
63            "Number of rounds in certificate",
64            &["certificate_value", "round_type"],
65            exponential_bucket_interval(0.1, 50.0),
66        )
67    });
68
69    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
70        register_histogram_vec(
71            "num_rounds_in_block_proposal",
72            "Number of rounds in block proposal",
73            &["round_type"],
74            exponential_bucket_interval(0.1, 50.0),
75        )
76    });
77
78    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
79        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
80
81    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
82        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
83    });
84
85    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
86        register_int_counter_vec(
87            "certificates_signed",
88            "Number of confirmed block certificates signed by each validator",
89            &["validator_name"],
90        )
91    });
92
93    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
94        register_int_counter(
95            "chain_info_queries",
96            "Number of chain info queries processed",
97        )
98    });
99}
100
101/// Instruct the networking layer to send cross-chain requests and/or push notifications.
102#[derive(Default, Debug)]
103pub struct NetworkActions {
104    /// The cross-chain requests
105    pub cross_chain_requests: Vec<CrossChainRequest>,
106    /// The push notifications.
107    pub notifications: Vec<Notification>,
108}
109
110impl NetworkActions {
111    pub fn extend(&mut self, other: NetworkActions) {
112        self.cross_chain_requests.extend(other.cross_chain_requests);
113        self.notifications.extend(other.notifications);
114    }
115}
116
117#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
118/// Notification that a chain has a new certified block or a new message.
119pub struct Notification {
120    pub chain_id: ChainId,
121    pub reason: Reason,
122}
123
124doc_scalar!(
125    Notification,
126    "Notify that a chain has a new certified block or a new message"
127);
128
129#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
130/// Reason for the notification.
131pub enum Reason {
132    NewBlock {
133        height: BlockHeight,
134        hash: CryptoHash,
135    },
136    NewEvents {
137        height: BlockHeight,
138        hash: CryptoHash,
139        event_streams: BTreeSet<StreamId>,
140    },
141    NewIncomingBundle {
142        origin: ChainId,
143        height: BlockHeight,
144    },
145    NewRound {
146        height: BlockHeight,
147        round: Round,
148    },
149}
150
151/// Error type for worker operations.
152#[derive(Debug, Error)]
153pub enum WorkerError {
154    #[error(transparent)]
155    CryptoError(#[from] CryptoError),
156
157    #[error(transparent)]
158    ArithmeticError(#[from] ArithmeticError),
159
160    #[error(transparent)]
161    ViewError(#[from] ViewError),
162
163    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
164    ReadCertificatesError(Vec<CryptoHash>),
165
166    #[error(transparent)]
167    ChainError(#[from] Box<ChainError>),
168
169    // Chain access control
170    #[error("Block was not signed by an authorized owner")]
171    InvalidOwner,
172
173    #[error("Operations in the block are not authenticated by the proper signer: {0}")]
174    InvalidSigner(AccountOwner),
175
176    // Chaining
177    #[error(
178        "Was expecting block height {expected_block_height} but found {found_block_height} instead"
179    )]
180    UnexpectedBlockHeight {
181        expected_block_height: BlockHeight,
182        found_block_height: BlockHeight,
183    },
184    #[error("Unexpected epoch {epoch:}: chain {chain_id:} is at {chain_epoch:}")]
185    InvalidEpoch {
186        chain_id: ChainId,
187        chain_epoch: Epoch,
188        epoch: Epoch,
189    },
190
191    #[error("Events not found: {0:?}")]
192    EventsNotFound(Vec<EventId>),
193
194    // Other server-side errors
195    #[error("Invalid cross-chain request")]
196    InvalidCrossChainRequest,
197    #[error("The block does not contain the hash that we expected for the previous block")]
198    InvalidBlockChaining,
199    #[error(
200        "The given outcome is not what we computed after executing the block.\n\
201        Computed: {computed:#?}\n\
202        Submitted: {submitted:#?}"
203    )]
204    IncorrectOutcome {
205        computed: Box<BlockExecutionOutcome>,
206        submitted: Box<BlockExecutionOutcome>,
207    },
208    #[error("The block timestamp is in the future.")]
209    InvalidTimestamp,
210    #[error("We don't have the value for the certificate.")]
211    MissingCertificateValue,
212    #[error("The hash certificate doesn't match its value.")]
213    InvalidLiteCertificate,
214    #[error("Fast blocks cannot query oracles")]
215    FastBlockUsingOracles,
216    #[error("Blobs not found: {0:?}")]
217    BlobsNotFound(Vec<BlobId>),
218    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
219    ConfirmedLogEntryNotFound {
220        height: BlockHeight,
221        chain_id: ChainId,
222    },
223    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
224    PreprocessedBlocksEntryNotFound {
225        height: BlockHeight,
226        chain_id: ChainId,
227    },
228    #[error("The block proposal is invalid: {0}")]
229    InvalidBlockProposal(String),
230    #[error("The worker is too busy to handle new chains")]
231    FullChainWorkerCache,
232    #[error("Failed to join spawned worker task")]
233    JoinError,
234    #[error("Blob was not required by any pending block")]
235    UnexpectedBlob,
236    #[error("Number of published blobs per block must not exceed {0}")]
237    TooManyPublishedBlobs(u64),
238    #[error("Missing network description")]
239    MissingNetworkDescription,
240}
241
242impl From<ChainError> for WorkerError {
243    #[instrument(level = "trace", skip(chain_error))]
244    fn from(chain_error: ChainError) -> Self {
245        match chain_error {
246            ChainError::ExecutionError(execution_error, context) => {
247                if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
248                    Self::BlobsNotFound(blob_ids)
249                } else {
250                    Self::ChainError(Box::new(ChainError::ExecutionError(
251                        execution_error,
252                        context,
253                    )))
254                }
255            }
256            error => Self::ChainError(Box::new(error)),
257        }
258    }
259}
260
261#[cfg(with_testing)]
262impl WorkerError {
263    /// Returns the inner [`ExecutionError`] in this error.
264    ///
265    /// # Panics
266    ///
267    /// If this is not caused by an [`ExecutionError`].
268    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
269        let WorkerError::ChainError(chain_error) = self else {
270            panic!("Expected an `ExecutionError`. Got: {self:#?}");
271        };
272
273        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
274            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
275        };
276
277        assert_eq!(context, expected_context);
278
279        *execution_error
280    }
281}
282
283/// State of a worker in a validator or a local node.
284pub struct WorkerState<StorageClient>
285where
286    StorageClient: Storage,
287{
288    /// A name used for logging
289    nickname: String,
290    /// Access to local persistent storage.
291    storage: StorageClient,
292    /// Configuration options for the [`ChainWorker`]s.
293    chain_worker_config: ChainWorkerConfig,
294    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
295    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
296    /// Chain IDs that should be tracked by a worker.
297    tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
298    /// One-shot channels to notify callers when messages of a particular chain have been
299    /// delivered.
300    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
301    /// The set of spawned [`ChainWorkerActor`] tasks.
302    chain_worker_tasks: Arc<Mutex<JoinSet>>,
303    /// The cache of running [`ChainWorkerActor`]s.
304    chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
305}
306
307impl<StorageClient> Clone for WorkerState<StorageClient>
308where
309    StorageClient: Storage + Clone,
310{
311    fn clone(&self) -> Self {
312        WorkerState {
313            nickname: self.nickname.clone(),
314            storage: self.storage.clone(),
315            chain_worker_config: self.chain_worker_config.clone(),
316            block_cache: self.block_cache.clone(),
317            execution_state_cache: self.execution_state_cache.clone(),
318            tracked_chains: self.tracked_chains.clone(),
319            delivery_notifiers: self.delivery_notifiers.clone(),
320            chain_worker_tasks: self.chain_worker_tasks.clone(),
321            chain_workers: self.chain_workers.clone(),
322        }
323    }
324}
325
326/// The sender endpoint for [`ChainWorkerRequest`]s.
327type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
328    ChainWorkerRequest<<StorageClient as Storage>::Context>,
329    tracing::Span,
330)>;
331
332pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
333
334impl<StorageClient> WorkerState<StorageClient>
335where
336    StorageClient: Storage,
337{
338    #[instrument(level = "trace", skip(nickname, key_pair, storage))]
339    pub fn new(
340        nickname: String,
341        key_pair: Option<ValidatorSecretKey>,
342        storage: StorageClient,
343    ) -> Self {
344        WorkerState {
345            nickname,
346            storage,
347            chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
348            block_cache: Arc::new(ValueCache::default()),
349            execution_state_cache: Arc::new(ValueCache::default()),
350            tracked_chains: None,
351            delivery_notifiers: Arc::default(),
352            chain_worker_tasks: Arc::default(),
353            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
354        }
355    }
356
357    #[instrument(level = "trace", skip(nickname, storage))]
358    pub fn new_for_client(
359        nickname: String,
360        storage: StorageClient,
361        tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
362    ) -> Self {
363        WorkerState {
364            nickname,
365            storage,
366            chain_worker_config: ChainWorkerConfig::default(),
367            block_cache: Arc::new(ValueCache::default()),
368            execution_state_cache: Arc::new(ValueCache::default()),
369            tracked_chains: Some(tracked_chains),
370            delivery_notifiers: Arc::default(),
371            chain_worker_tasks: Arc::default(),
372            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
373        }
374    }
375
376    #[instrument(level = "trace", skip(self, value))]
377    pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
378        self.chain_worker_config.allow_inactive_chains = value;
379        self
380    }
381
382    #[instrument(level = "trace", skip(self, value))]
383    pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
384        self.chain_worker_config
385            .allow_messages_from_deprecated_epochs = value;
386        self
387    }
388
389    #[instrument(level = "trace", skip(self, value))]
390    pub fn with_long_lived_services(mut self, value: bool) -> Self {
391        self.chain_worker_config.long_lived_services = value;
392        self
393    }
394
395    /// Returns an instance with the specified grace period.
396    ///
397    /// Blocks with a timestamp this far in the future will still be accepted, but the validator
398    /// will wait until that timestamp before voting.
399    #[instrument(level = "trace", skip(self))]
400    pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
401        self.chain_worker_config.grace_period = grace_period;
402        self
403    }
404
405    /// Returns an instance with the specified chain worker TTL.
406    ///
407    /// Idle chain workers free their memory after that duration without requests.
408    #[instrument(level = "trace", skip(self))]
409    pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
410        self.chain_worker_config.ttl = chain_worker_ttl;
411        self
412    }
413
414    #[instrument(level = "trace", skip(self))]
415    pub fn nickname(&self) -> &str {
416        &self.nickname
417    }
418
419    /// Returns the storage client so that it can be manipulated or queried.
420    #[instrument(level = "trace", skip(self))]
421    #[cfg(not(feature = "test"))]
422    pub(crate) fn storage_client(&self) -> &StorageClient {
423        &self.storage
424    }
425
426    /// Returns the storage client so that it can be manipulated or queried by tests in other
427    /// crates.
428    #[instrument(level = "trace", skip(self))]
429    #[cfg(feature = "test")]
430    pub fn storage_client(&self) -> &StorageClient {
431        &self.storage
432    }
433
434    #[instrument(level = "trace", skip(self, certificate))]
435    pub(crate) async fn full_certificate(
436        &self,
437        certificate: LiteCertificate<'_>,
438    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
439        let block = self
440            .block_cache
441            .get(&certificate.value.value_hash)
442            .ok_or(WorkerError::MissingCertificateValue)?;
443
444        match certificate.value.kind {
445            linera_chain::types::CertificateKind::Confirmed => {
446                let value = ConfirmedBlock::from_hashed(block);
447                Ok(Either::Left(
448                    certificate
449                        .with_value(value)
450                        .ok_or(WorkerError::InvalidLiteCertificate)?,
451                ))
452            }
453            linera_chain::types::CertificateKind::Validated => {
454                let value = ValidatedBlock::from_hashed(block);
455                Ok(Either::Right(
456                    certificate
457                        .with_value(value)
458                        .ok_or(WorkerError::InvalidLiteCertificate)?,
459                ))
460            }
461            _ => return Err(WorkerError::InvalidLiteCertificate),
462        }
463    }
464}
465
466#[allow(async_fn_in_trait)]
467#[cfg_attr(not(web), trait_variant::make(Send))]
468pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
469    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
470        worker: &WorkerState<S>,
471        certificate: GenericCertificate<Self>,
472    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
473}
474
475impl ProcessableCertificate for ConfirmedBlock {
476    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
477        worker: &WorkerState<S>,
478        certificate: ConfirmedBlockCertificate,
479    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
480        worker.handle_confirmed_certificate(certificate, None).await
481    }
482}
483
484impl ProcessableCertificate for ValidatedBlock {
485    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
486        worker: &WorkerState<S>,
487        certificate: ValidatedBlockCertificate,
488    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
489        worker.handle_validated_certificate(certificate).await
490    }
491}
492
493impl ProcessableCertificate for Timeout {
494    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
495        worker: &WorkerState<S>,
496        certificate: TimeoutCertificate,
497    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
498        worker.handle_timeout_certificate(certificate).await
499    }
500}
501
502impl<StorageClient> WorkerState<StorageClient>
503where
504    StorageClient: Storage + Clone + Send + Sync + 'static,
505{
506    #[instrument(level = "trace", skip(self, certificate, notifier))]
507    #[inline]
508    pub async fn fully_handle_certificate_with_notifications<T>(
509        &self,
510        certificate: GenericCertificate<T>,
511        notifier: &impl Notifier,
512    ) -> Result<ChainInfoResponse, WorkerError>
513    where
514        T: ProcessableCertificate,
515    {
516        let notifications = (*notifier).clone();
517        let this = self.clone();
518        linera_base::task::spawn(async move {
519            let (response, actions) =
520                ProcessableCertificate::process_certificate(&this, certificate).await?;
521            notifications.notify(&actions.notifications);
522            let mut requests = VecDeque::from(actions.cross_chain_requests);
523            while let Some(request) = requests.pop_front() {
524                let actions = this.handle_cross_chain_request(request).await?;
525                requests.extend(actions.cross_chain_requests);
526                notifications.notify(&actions.notifications);
527            }
528            Ok(response)
529        })
530        .await
531        .unwrap_or_else(|_| Err(WorkerError::JoinError))
532    }
533
534    /// Tries to execute a block proposal without any verification other than block execution.
535    #[instrument(level = "trace", skip(self, block))]
536    pub async fn stage_block_execution(
537        &self,
538        block: ProposedBlock,
539        round: Option<u32>,
540        published_blobs: Vec<Blob>,
541    ) -> Result<(Block, ChainInfoResponse), WorkerError> {
542        self.query_chain_worker(block.chain_id, move |callback| {
543            ChainWorkerRequest::StageBlockExecution {
544                block,
545                round,
546                published_blobs,
547                callback,
548            }
549        })
550        .await
551    }
552
553    /// Executes a [`Query`] for an application's state on a specific chain.
554    #[instrument(level = "trace", skip(self, chain_id, query))]
555    pub async fn query_application(
556        &self,
557        chain_id: ChainId,
558        query: Query,
559    ) -> Result<QueryOutcome, WorkerError> {
560        self.query_chain_worker(chain_id, move |callback| {
561            ChainWorkerRequest::QueryApplication { query, callback }
562        })
563        .await
564    }
565
566    #[instrument(level = "trace", skip(self, chain_id, application_id))]
567    pub async fn describe_application(
568        &self,
569        chain_id: ChainId,
570        application_id: ApplicationId,
571    ) -> Result<ApplicationDescription, WorkerError> {
572        self.query_chain_worker(chain_id, move |callback| {
573            ChainWorkerRequest::DescribeApplication {
574                application_id,
575                callback,
576            }
577        })
578        .await
579    }
580
581    /// Processes a confirmed block (aka a commit).
582    #[instrument(
583        level = "trace",
584        skip(self, certificate, notify_when_messages_are_delivered)
585    )]
586    async fn process_confirmed_block(
587        &self,
588        certificate: ConfirmedBlockCertificate,
589        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
590    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
591        let chain_id = certificate.block().header.chain_id;
592        self.query_chain_worker(chain_id, move |callback| {
593            ChainWorkerRequest::ProcessConfirmedBlock {
594                certificate,
595                notify_when_messages_are_delivered,
596                callback,
597            }
598        })
599        .await
600    }
601
602    /// Processes a validated block issued from a multi-owner chain.
603    #[instrument(level = "trace", skip(self, certificate))]
604    async fn process_validated_block(
605        &self,
606        certificate: ValidatedBlockCertificate,
607    ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
608        let chain_id = certificate.block().header.chain_id;
609        self.query_chain_worker(chain_id, move |callback| {
610            ChainWorkerRequest::ProcessValidatedBlock {
611                certificate,
612                callback,
613            }
614        })
615        .await
616    }
617
618    /// Processes a leader timeout issued from a multi-owner chain.
619    #[instrument(level = "trace", skip(self, certificate))]
620    async fn process_timeout(
621        &self,
622        certificate: TimeoutCertificate,
623    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
624        let chain_id = certificate.value().chain_id();
625        self.query_chain_worker(chain_id, move |callback| {
626            ChainWorkerRequest::ProcessTimeout {
627                certificate,
628                callback,
629            }
630        })
631        .await
632    }
633
634    #[instrument(level = "trace", skip(self, origin, recipient, bundles))]
635    async fn process_cross_chain_update(
636        &self,
637        origin: ChainId,
638        recipient: ChainId,
639        bundles: Vec<(Epoch, MessageBundle)>,
640    ) -> Result<Option<BlockHeight>, WorkerError> {
641        self.query_chain_worker(recipient, move |callback| {
642            ChainWorkerRequest::ProcessCrossChainUpdate {
643                origin,
644                bundles,
645                callback,
646            }
647        })
648        .await
649    }
650
651    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
652    #[instrument(level = "trace", skip(self, chain_id, height))]
653    #[cfg(with_testing)]
654    pub async fn read_certificate(
655        &self,
656        chain_id: ChainId,
657        height: BlockHeight,
658    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
659        self.query_chain_worker(chain_id, move |callback| {
660            ChainWorkerRequest::ReadCertificate { height, callback }
661        })
662        .await
663    }
664
665    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
666    /// [`ChainId`].
667    ///
668    /// The returned view holds a lock on the chain state, which prevents the worker from changing
669    /// the state of that chain.
670    #[instrument(level = "trace", skip(self))]
671    pub async fn chain_state_view(
672        &self,
673        chain_id: ChainId,
674    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
675        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
676            callback,
677        })
678        .await
679    }
680
681    #[instrument(level = "trace", skip(self, request_builder))]
682    /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
683    async fn query_chain_worker<Response>(
684        &self,
685        chain_id: ChainId,
686        request_builder: impl FnOnce(
687            oneshot::Sender<Result<Response, WorkerError>>,
688        ) -> ChainWorkerRequest<StorageClient::Context>,
689    ) -> Result<Response, WorkerError> {
690        let chain_actor = self.get_chain_worker_endpoint(chain_id).await?;
691        let (callback, response) = oneshot::channel();
692
693        chain_actor
694            .send((request_builder(callback), tracing::Span::current()))
695            .expect("`ChainWorkerActor` stopped executing unexpectedly");
696
697        response
698            .await
699            .expect("`ChainWorkerActor` stopped executing without responding")
700    }
701
702    /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, creating one and adding it
703    /// to the cache if needed.
704    #[instrument(level = "trace", skip(self))]
705    async fn get_chain_worker_endpoint(
706        &self,
707        chain_id: ChainId,
708    ) -> Result<ChainActorEndpoint<StorageClient>, WorkerError> {
709        let (sender, new_receiver) = timeout(Duration::from_secs(3), async move {
710            loop {
711                match self.try_get_chain_worker_endpoint(chain_id) {
712                    Some(endpoint) => break endpoint,
713                    None => sleep(Duration::from_millis(250)).await,
714                }
715            }
716        })
717        .await
718        .map_err(|_| WorkerError::FullChainWorkerCache)?;
719
720        if let Some(receiver) = new_receiver {
721            let delivery_notifier = self
722                .delivery_notifiers
723                .lock()
724                .unwrap()
725                .entry(chain_id)
726                .or_default()
727                .clone();
728
729            let actor_task = ChainWorkerActor::run(
730                self.chain_worker_config.clone(),
731                self.storage.clone(),
732                self.block_cache.clone(),
733                self.execution_state_cache.clone(),
734                self.tracked_chains.clone(),
735                delivery_notifier,
736                chain_id,
737                receiver,
738            );
739
740            self.chain_worker_tasks
741                .lock()
742                .unwrap()
743                .spawn_task(actor_task);
744        }
745
746        Ok(sender)
747    }
748
749    /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, attempting to create one
750    /// and add it to the cache if needed.
751    ///
752    /// Returns [`None`] if the cache is full and no candidate for eviction was found.
753    #[instrument(level = "trace", skip(self))]
754    #[expect(clippy::type_complexity)]
755    fn try_get_chain_worker_endpoint(
756        &self,
757        chain_id: ChainId,
758    ) -> Option<(
759        ChainActorEndpoint<StorageClient>,
760        Option<
761            mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
762        >,
763    )> {
764        let mut chain_workers = self.chain_workers.lock().unwrap();
765
766        if let Some(endpoint) = chain_workers.get(&chain_id) {
767            Some((endpoint.clone(), None))
768        } else {
769            let (sender, receiver) = mpsc::unbounded_channel();
770            chain_workers.insert(chain_id, sender.clone());
771            Some((sender, Some(receiver)))
772        }
773    }
774
775    #[instrument(skip_all, fields(
776        nick = self.nickname,
777        chain_id = format!("{:.8}", proposal.content.block.chain_id),
778        height = %proposal.content.block.height,
779    ))]
780    pub async fn handle_block_proposal(
781        &self,
782        proposal: BlockProposal,
783    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
784        trace!("{} <-- {:?}", self.nickname, proposal);
785        #[cfg(with_metrics)]
786        let round = proposal.content.round;
787        let response = self
788            .query_chain_worker(proposal.content.block.chain_id, move |callback| {
789                ChainWorkerRequest::HandleBlockProposal { proposal, callback }
790            })
791            .await?;
792        #[cfg(with_metrics)]
793        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
794            .with_label_values(&[round.type_name()])
795            .observe(round.number() as f64);
796        Ok(response)
797    }
798
799    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
800    // Other fields will be included in handle_certificate's span.
801    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
802    pub async fn handle_lite_certificate(
803        &self,
804        certificate: LiteCertificate<'_>,
805        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
806    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
807        match self.full_certificate(certificate).await? {
808            Either::Left(confirmed) => {
809                self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
810                    .await
811            }
812            Either::Right(validated) => {
813                if let Some(notifier) = notify_when_messages_are_delivered {
814                    // Nothing to wait for.
815                    if let Err(()) = notifier.send(()) {
816                        warn!("Failed to notify message delivery to caller");
817                    }
818                }
819                self.handle_validated_certificate(validated).await
820            }
821        }
822    }
823
824    /// Processes a confirmed block certificate.
825    #[instrument(skip_all, fields(
826        nick = self.nickname,
827        chain_id = format!("{:.8}", certificate.block().header.chain_id),
828        height = %certificate.block().header.height,
829    ))]
830    pub async fn handle_confirmed_certificate(
831        &self,
832        certificate: ConfirmedBlockCertificate,
833        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
834    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
835        trace!("{} <-- {:?}", self.nickname, certificate);
836        #[cfg(with_metrics)]
837        let metrics_data = if self
838            .chain_state_view(certificate.block().header.chain_id)
839            .await?
840            .tip_state
841            .get()
842            .next_block_height
843            == certificate.block().header.height
844        {
845            Some((
846                certificate.inner().to_log_str(),
847                certificate.round.type_name(),
848                certificate.round.number(),
849                certificate.block().body.transactions.len() as u64,
850                certificate
851                    .signatures()
852                    .iter()
853                    .map(|(validator_name, _)| validator_name.to_string())
854                    .collect::<Vec<_>>(),
855            ))
856        } else {
857            // Block already processed or will only be preprocessed, no metrics to report.
858            None
859        };
860
861        let result = self
862            .process_confirmed_block(certificate, notify_when_messages_are_delivered)
863            .await?;
864
865        #[cfg(with_metrics)]
866        {
867            if let Some(metrics_data) = metrics_data {
868                let (
869                    certificate_log_str,
870                    round_type,
871                    round_number,
872                    confirmed_transactions,
873                    validators_with_signatures,
874                ) = metrics_data;
875                metrics::NUM_BLOCKS.with_label_values(&[]).inc();
876                metrics::NUM_ROUNDS_IN_CERTIFICATE
877                    .with_label_values(&[certificate_log_str, round_type])
878                    .observe(round_number as f64);
879                if confirmed_transactions > 0 {
880                    metrics::TRANSACTION_COUNT
881                        .with_label_values(&[])
882                        .inc_by(confirmed_transactions);
883                }
884
885                for validator_name in validators_with_signatures {
886                    metrics::CERTIFICATES_SIGNED
887                        .with_label_values(&[&validator_name])
888                        .inc();
889                }
890            }
891        }
892        Ok(result)
893    }
894
895    /// Processes a validated block certificate.
896    #[instrument(skip_all, fields(
897        nick = self.nickname,
898        chain_id = format!("{:.8}", certificate.block().header.chain_id),
899        height = %certificate.block().header.height,
900    ))]
901    pub async fn handle_validated_certificate(
902        &self,
903        certificate: ValidatedBlockCertificate,
904    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
905        trace!("{} <-- {:?}", self.nickname, certificate);
906
907        #[cfg(with_metrics)]
908        let round = certificate.round;
909        #[cfg(with_metrics)]
910        let cert_str = certificate.inner().to_log_str();
911
912        let (info, actions, _duplicated) = self.process_validated_block(certificate).await?;
913        #[cfg(with_metrics)]
914        {
915            if !_duplicated {
916                metrics::NUM_ROUNDS_IN_CERTIFICATE
917                    .with_label_values(&[cert_str, round.type_name()])
918                    .observe(round.number() as f64);
919            }
920        }
921        Ok((info, actions))
922    }
923
924    /// Processes a timeout certificate
925    #[instrument(skip_all, fields(
926        nick = self.nickname,
927        chain_id = format!("{:.8}", certificate.inner().chain_id()),
928        height = %certificate.inner().height(),
929    ))]
930    pub async fn handle_timeout_certificate(
931        &self,
932        certificate: TimeoutCertificate,
933    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
934        trace!("{} <-- {:?}", self.nickname, certificate);
935        self.process_timeout(certificate).await
936    }
937
938    #[instrument(skip_all, fields(
939        nick = self.nickname,
940        chain_id = format!("{:.8}", query.chain_id)
941    ))]
942    pub async fn handle_chain_info_query(
943        &self,
944        query: ChainInfoQuery,
945    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
946        trace!("{} <-- {:?}", self.nickname, query);
947        #[cfg(with_metrics)]
948        metrics::CHAIN_INFO_QUERIES.inc();
949        let result = self
950            .query_chain_worker(query.chain_id, move |callback| {
951                ChainWorkerRequest::HandleChainInfoQuery { query, callback }
952            })
953            .await;
954        trace!("{} --> {:?}", self.nickname, result);
955        result
956    }
957
958    #[instrument(skip_all, fields(
959        nick = self.nickname,
960        chain_id = format!("{:.8}", chain_id)
961    ))]
962    pub async fn download_pending_blob(
963        &self,
964        chain_id: ChainId,
965        blob_id: BlobId,
966    ) -> Result<Blob, WorkerError> {
967        trace!(
968            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
969            self.nickname
970        );
971        let result = self
972            .query_chain_worker(chain_id, move |callback| {
973                ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
974            })
975            .await;
976        trace!(
977            "{} --> {:?}",
978            self.nickname,
979            result.as_ref().map(|_| blob_id)
980        );
981        result
982    }
983
984    #[instrument(skip_all, fields(
985        nick = self.nickname,
986        chain_id = format!("{:.8}", chain_id)
987    ))]
988    pub async fn handle_pending_blob(
989        &self,
990        chain_id: ChainId,
991        blob: Blob,
992    ) -> Result<ChainInfoResponse, WorkerError> {
993        let blob_id = blob.id();
994        trace!(
995            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
996            self.nickname
997        );
998        let result = self
999            .query_chain_worker(chain_id, move |callback| {
1000                ChainWorkerRequest::HandlePendingBlob { blob, callback }
1001            })
1002            .await;
1003        trace!(
1004            "{} --> {:?}",
1005            self.nickname,
1006            result.as_ref().map(|_| blob_id)
1007        );
1008        result
1009    }
1010
1011    #[instrument(skip_all, fields(
1012        nick = self.nickname,
1013        chain_id = format!("{:.8}", request.target_chain_id())
1014    ))]
1015    pub async fn handle_cross_chain_request(
1016        &self,
1017        request: CrossChainRequest,
1018    ) -> Result<NetworkActions, WorkerError> {
1019        trace!("{} <-- {:?}", self.nickname, request);
1020        match request {
1021            CrossChainRequest::UpdateRecipient {
1022                sender,
1023                recipient,
1024                bundles,
1025            } => {
1026                let mut actions = NetworkActions::default();
1027                let origin = sender;
1028                let Some(height) = self
1029                    .process_cross_chain_update(origin, recipient, bundles)
1030                    .await?
1031                else {
1032                    return Ok(actions);
1033                };
1034                actions.notifications.push(Notification {
1035                    chain_id: recipient,
1036                    reason: Reason::NewIncomingBundle { origin, height },
1037                });
1038                actions
1039                    .cross_chain_requests
1040                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1041                        sender,
1042                        recipient,
1043                        latest_height: height,
1044                    });
1045                Ok(actions)
1046            }
1047            CrossChainRequest::ConfirmUpdatedRecipient {
1048                sender,
1049                recipient,
1050                latest_height,
1051            } => {
1052                self.query_chain_worker(sender, move |callback| {
1053                    ChainWorkerRequest::ConfirmUpdatedRecipient {
1054                        recipient,
1055                        latest_height,
1056                        callback,
1057                    }
1058                })
1059                .await?;
1060                Ok(NetworkActions::default())
1061            }
1062        }
1063    }
1064
1065    /// Updates the received certificate trackers to at least the given values.
1066    pub async fn update_received_certificate_trackers(
1067        &self,
1068        chain_id: ChainId,
1069        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1070    ) -> Result<(), WorkerError> {
1071        self.query_chain_worker(chain_id, move |callback| {
1072            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1073                new_trackers,
1074                callback,
1075            }
1076        })
1077        .await
1078    }
1079}
1080
1081#[cfg(with_testing)]
1082impl<StorageClient> WorkerState<StorageClient>
1083where
1084    StorageClient: Storage,
1085{
1086    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1087    ///
1088    /// # Panics
1089    ///
1090    /// If the validator doesn't have a key pair assigned to it.
1091    #[instrument(level = "trace", skip(self))]
1092    pub fn public_key(&self) -> ValidatorPublicKey {
1093        self.chain_worker_config
1094            .key_pair()
1095            .expect(
1096                "Test validator should have a key pair assigned to it \
1097                in order to obtain it's public key",
1098            )
1099            .public()
1100    }
1101}