linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, HashSet, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14    data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round},
15    doc_scalar,
16    hashed::Hashed,
17    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId},
18    time::timer::{sleep, timeout},
19};
20#[cfg(with_testing)]
21use linera_chain::ChainExecutionContext;
22use linera_chain::{
23    data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24    types::{
25        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
26        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
27    },
28    ChainError, ChainStateView,
29};
30use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
31use linera_storage::Storage;
32use linera_views::ViewError;
33use serde::{Deserialize, Serialize};
34use thiserror::Error;
35use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
36use tracing::{error, instrument, trace, warn};
37
38use crate::{
39    chain_worker::{ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier},
40    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
41    join_set_ext::{JoinSet, JoinSetExt},
42    notifier::Notifier,
43    value_cache::ValueCache,
44};
45
46#[cfg(test)]
47#[path = "unit_tests/worker_tests.rs"]
48mod worker_tests;
49
50#[cfg(with_metrics)]
51mod metrics {
52    use std::sync::LazyLock;
53
54    use linera_base::prometheus_util::{
55        exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
56    };
57    use prometheus::{HistogramVec, IntCounterVec};
58
59    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
60        register_histogram_vec(
61            "num_rounds_in_certificate",
62            "Number of rounds in certificate",
63            &["certificate_value", "round_type"],
64            exponential_bucket_interval(0.1, 50.0),
65        )
66    });
67
68    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
69        register_histogram_vec(
70            "num_rounds_in_block_proposal",
71            "Number of rounds in block proposal",
72            &["round_type"],
73            exponential_bucket_interval(0.1, 50.0),
74        )
75    });
76
77    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
78        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
79
80    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
81        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
82    });
83
84    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
85        register_int_counter_vec(
86            "certificates_signed",
87            "Number of confirmed block certificates signed by each validator",
88            &["validator_name"],
89        )
90    });
91}
92
93/// Instruct the networking layer to send cross-chain requests and/or push notifications.
94#[derive(Default, Debug)]
95pub struct NetworkActions {
96    /// The cross-chain requests
97    pub cross_chain_requests: Vec<CrossChainRequest>,
98    /// The push notifications.
99    pub notifications: Vec<Notification>,
100}
101
102impl NetworkActions {
103    pub fn extend(&mut self, other: NetworkActions) {
104        self.cross_chain_requests.extend(other.cross_chain_requests);
105        self.notifications.extend(other.notifications);
106    }
107}
108
109#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
110/// Notification that a chain has a new certified block or a new message.
111pub struct Notification {
112    pub chain_id: ChainId,
113    pub reason: Reason,
114}
115
116doc_scalar!(
117    Notification,
118    "Notify that a chain has a new certified block or a new message"
119);
120
121#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
122/// Reason for the notification.
123pub enum Reason {
124    NewBlock {
125        height: BlockHeight,
126        hash: CryptoHash,
127    },
128    NewIncomingBundle {
129        origin: ChainId,
130        height: BlockHeight,
131    },
132    NewRound {
133        height: BlockHeight,
134        round: Round,
135    },
136}
137
138/// Error type for worker operations.
139#[derive(Debug, Error)]
140pub enum WorkerError {
141    #[error(transparent)]
142    CryptoError(#[from] CryptoError),
143
144    #[error(transparent)]
145    ArithmeticError(#[from] ArithmeticError),
146
147    #[error(transparent)]
148    ViewError(#[from] ViewError),
149
150    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
151    ReadCertificatesError(Vec<CryptoHash>),
152
153    #[error(transparent)]
154    ChainError(#[from] Box<ChainError>),
155
156    // Chain access control
157    #[error("Block was not signed by an authorized owner")]
158    InvalidOwner,
159
160    #[error("Operations in the block are not authenticated by the proper signer: {0}")]
161    InvalidSigner(AccountOwner),
162
163    // Chaining
164    #[error(
165        "Was expecting block height {expected_block_height} but found {found_block_height} instead"
166    )]
167    UnexpectedBlockHeight {
168        expected_block_height: BlockHeight,
169        found_block_height: BlockHeight,
170    },
171    #[error("Unexpected epoch {epoch:}: chain {chain_id:} is at {chain_epoch:}")]
172    InvalidEpoch {
173        chain_id: ChainId,
174        chain_epoch: Epoch,
175        epoch: Epoch,
176    },
177
178    #[error("Events not found: {0:?}")]
179    EventsNotFound(Vec<EventId>),
180
181    // Other server-side errors
182    #[error("Invalid cross-chain request")]
183    InvalidCrossChainRequest,
184    #[error("The block does not contain the hash that we expected for the previous block")]
185    InvalidBlockChaining,
186    #[error(
187        "The given outcome is not what we computed after executing the block.\n\
188        Computed: {computed:#?}\n\
189        Submitted: {submitted:#?}"
190    )]
191    IncorrectOutcome {
192        computed: Box<BlockExecutionOutcome>,
193        submitted: Box<BlockExecutionOutcome>,
194    },
195    #[error("The block timestamp is in the future.")]
196    InvalidTimestamp,
197    #[error("We don't have the value for the certificate.")]
198    MissingCertificateValue,
199    #[error("The hash certificate doesn't match its value.")]
200    InvalidLiteCertificate,
201    #[error("Fast blocks cannot query oracles")]
202    FastBlockUsingOracles,
203    #[error("Blobs not found: {0:?}")]
204    BlobsNotFound(Vec<BlobId>),
205    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
206    ConfirmedLogEntryNotFound {
207        height: BlockHeight,
208        chain_id: ChainId,
209    },
210    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
211    PreprocessedBlocksEntryNotFound {
212        height: BlockHeight,
213        chain_id: ChainId,
214    },
215    #[error("The block proposal is invalid: {0}")]
216    InvalidBlockProposal(String),
217    #[error("The worker is too busy to handle new chains")]
218    FullChainWorkerCache,
219    #[error("Failed to join spawned worker task")]
220    JoinError,
221    #[error("Blob was not required by any pending block")]
222    UnexpectedBlob,
223    #[error("Number of published blobs per block must not exceed {0}")]
224    TooManyPublishedBlobs(u64),
225    #[error("Missing network description")]
226    MissingNetworkDescription,
227}
228
229impl From<ChainError> for WorkerError {
230    #[instrument(level = "trace", skip(chain_error))]
231    fn from(chain_error: ChainError) -> Self {
232        match chain_error {
233            ChainError::ExecutionError(execution_error, context) => {
234                if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
235                    Self::BlobsNotFound(blob_ids)
236                } else {
237                    Self::ChainError(Box::new(ChainError::ExecutionError(
238                        execution_error,
239                        context,
240                    )))
241                }
242            }
243            error => Self::ChainError(Box::new(error)),
244        }
245    }
246}
247
248#[cfg(with_testing)]
249impl WorkerError {
250    /// Returns the inner [`ExecutionError`] in this error.
251    ///
252    /// # Panics
253    ///
254    /// If this is not caused by an [`ExecutionError`].
255    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
256        let WorkerError::ChainError(chain_error) = self else {
257            panic!("Expected an `ExecutionError`. Got: {self:#?}");
258        };
259
260        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
261            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
262        };
263
264        assert_eq!(context, expected_context);
265
266        *execution_error
267    }
268}
269
270/// State of a worker in a validator or a local node.
271pub struct WorkerState<StorageClient>
272where
273    StorageClient: Storage,
274{
275    /// A name used for logging
276    nickname: String,
277    /// Access to local persistent storage.
278    storage: StorageClient,
279    /// Configuration options for the [`ChainWorker`]s.
280    chain_worker_config: ChainWorkerConfig,
281    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
282    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
283    /// Chain IDs that should be tracked by a worker.
284    tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
285    /// One-shot channels to notify callers when messages of a particular chain have been
286    /// delivered.
287    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
288    /// The set of spawned [`ChainWorkerActor`] tasks.
289    chain_worker_tasks: Arc<Mutex<JoinSet>>,
290    /// The cache of running [`ChainWorkerActor`]s.
291    chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
292}
293
294impl<StorageClient> Clone for WorkerState<StorageClient>
295where
296    StorageClient: Storage + Clone,
297{
298    fn clone(&self) -> Self {
299        WorkerState {
300            nickname: self.nickname.clone(),
301            storage: self.storage.clone(),
302            chain_worker_config: self.chain_worker_config.clone(),
303            block_cache: self.block_cache.clone(),
304            execution_state_cache: self.execution_state_cache.clone(),
305            tracked_chains: self.tracked_chains.clone(),
306            delivery_notifiers: self.delivery_notifiers.clone(),
307            chain_worker_tasks: self.chain_worker_tasks.clone(),
308            chain_workers: self.chain_workers.clone(),
309        }
310    }
311}
312
313/// The sender endpoint for [`ChainWorkerRequest`]s.
314type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
315    ChainWorkerRequest<<StorageClient as Storage>::Context>,
316    tracing::Span,
317)>;
318
319pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
320
321impl<StorageClient> WorkerState<StorageClient>
322where
323    StorageClient: Storage,
324{
325    #[instrument(level = "trace", skip(nickname, key_pair, storage))]
326    pub fn new(
327        nickname: String,
328        key_pair: Option<ValidatorSecretKey>,
329        storage: StorageClient,
330    ) -> Self {
331        WorkerState {
332            nickname,
333            storage,
334            chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
335            block_cache: Arc::new(ValueCache::default()),
336            execution_state_cache: Arc::new(ValueCache::default()),
337            tracked_chains: None,
338            delivery_notifiers: Arc::default(),
339            chain_worker_tasks: Arc::default(),
340            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
341        }
342    }
343
344    #[instrument(level = "trace", skip(nickname, storage))]
345    pub fn new_for_client(
346        nickname: String,
347        storage: StorageClient,
348        tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
349    ) -> Self {
350        WorkerState {
351            nickname,
352            storage,
353            chain_worker_config: ChainWorkerConfig::default(),
354            block_cache: Arc::new(ValueCache::default()),
355            execution_state_cache: Arc::new(ValueCache::default()),
356            tracked_chains: Some(tracked_chains),
357            delivery_notifiers: Arc::default(),
358            chain_worker_tasks: Arc::default(),
359            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
360        }
361    }
362
363    #[instrument(level = "trace", skip(self, value))]
364    pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
365        self.chain_worker_config.allow_inactive_chains = value;
366        self
367    }
368
369    #[instrument(level = "trace", skip(self, value))]
370    pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
371        self.chain_worker_config
372            .allow_messages_from_deprecated_epochs = value;
373        self
374    }
375
376    #[instrument(level = "trace", skip(self, value))]
377    pub fn with_long_lived_services(mut self, value: bool) -> Self {
378        self.chain_worker_config.long_lived_services = value;
379        self
380    }
381
382    /// Returns an instance with the specified grace period.
383    ///
384    /// Blocks with a timestamp this far in the future will still be accepted, but the validator
385    /// will wait until that timestamp before voting.
386    #[instrument(level = "trace", skip(self))]
387    pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
388        self.chain_worker_config.grace_period = grace_period;
389        self
390    }
391
392    /// Returns an instance with the specified chain worker TTL.
393    ///
394    /// Idle chain workers free their memory after that duration without requests.
395    #[instrument(level = "trace", skip(self))]
396    pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
397        self.chain_worker_config.ttl = chain_worker_ttl;
398        self
399    }
400
401    #[instrument(level = "trace", skip(self))]
402    pub fn nickname(&self) -> &str {
403        &self.nickname
404    }
405
406    /// Returns the storage client so that it can be manipulated or queried.
407    #[instrument(level = "trace", skip(self))]
408    #[cfg(not(feature = "test"))]
409    pub(crate) fn storage_client(&self) -> &StorageClient {
410        &self.storage
411    }
412
413    /// Returns the storage client so that it can be manipulated or queried by tests in other
414    /// crates.
415    #[instrument(level = "trace", skip(self))]
416    #[cfg(feature = "test")]
417    pub fn storage_client(&self) -> &StorageClient {
418        &self.storage
419    }
420
421    #[instrument(level = "trace", skip(self, key_pair))]
422    #[cfg(test)]
423    pub(crate) async fn with_key_pair(mut self, key_pair: Option<Arc<ValidatorSecretKey>>) -> Self {
424        self.chain_worker_config.key_pair = key_pair;
425        self.chain_workers.lock().unwrap().clear();
426        self
427    }
428
429    #[instrument(level = "trace", skip(self, certificate))]
430    pub(crate) async fn full_certificate(
431        &self,
432        certificate: LiteCertificate<'_>,
433    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
434        let block = self
435            .block_cache
436            .get(&certificate.value.value_hash)
437            .ok_or(WorkerError::MissingCertificateValue)?;
438
439        match certificate.value.kind {
440            linera_chain::types::CertificateKind::Confirmed => {
441                let value = ConfirmedBlock::from_hashed(block);
442                Ok(Either::Left(
443                    certificate
444                        .with_value(value)
445                        .ok_or(WorkerError::InvalidLiteCertificate)?,
446                ))
447            }
448            linera_chain::types::CertificateKind::Validated => {
449                let value = ValidatedBlock::from_hashed(block);
450                Ok(Either::Right(
451                    certificate
452                        .with_value(value)
453                        .ok_or(WorkerError::InvalidLiteCertificate)?,
454                ))
455            }
456            _ => return Err(WorkerError::InvalidLiteCertificate),
457        }
458    }
459}
460
461#[allow(async_fn_in_trait)]
462#[cfg_attr(not(web), trait_variant::make(Send))]
463pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
464    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
465        worker: &WorkerState<S>,
466        certificate: GenericCertificate<Self>,
467    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
468}
469
470impl ProcessableCertificate for ConfirmedBlock {
471    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
472        worker: &WorkerState<S>,
473        certificate: ConfirmedBlockCertificate,
474    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
475        worker.handle_confirmed_certificate(certificate, None).await
476    }
477}
478
479impl ProcessableCertificate for ValidatedBlock {
480    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
481        worker: &WorkerState<S>,
482        certificate: ValidatedBlockCertificate,
483    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
484        worker.handle_validated_certificate(certificate).await
485    }
486}
487
488impl ProcessableCertificate for Timeout {
489    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
490        worker: &WorkerState<S>,
491        certificate: TimeoutCertificate,
492    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
493        worker.handle_timeout_certificate(certificate).await
494    }
495}
496
497impl<StorageClient> WorkerState<StorageClient>
498where
499    StorageClient: Storage + Clone + Send + Sync + 'static,
500{
501    #[instrument(level = "trace", skip(self, certificate, notifier))]
502    #[inline]
503    pub async fn fully_handle_certificate_with_notifications<T>(
504        &self,
505        certificate: GenericCertificate<T>,
506        notifier: &impl Notifier,
507    ) -> Result<ChainInfoResponse, WorkerError>
508    where
509        T: ProcessableCertificate,
510    {
511        let notifications = (*notifier).clone();
512        let this = self.clone();
513        linera_base::task::spawn(async move {
514            let (response, actions) =
515                ProcessableCertificate::process_certificate(&this, certificate).await?;
516            notifications.notify(&actions.notifications);
517            let mut requests = VecDeque::from(actions.cross_chain_requests);
518            while let Some(request) = requests.pop_front() {
519                let actions = this.handle_cross_chain_request(request).await?;
520                requests.extend(actions.cross_chain_requests);
521                notifications.notify(&actions.notifications);
522            }
523            Ok(response)
524        })
525        .await
526        .unwrap_or_else(|_| Err(WorkerError::JoinError))
527    }
528
529    /// Tries to execute a block proposal without any verification other than block execution.
530    #[instrument(level = "trace", skip(self, block))]
531    pub async fn stage_block_execution(
532        &self,
533        block: ProposedBlock,
534        round: Option<u32>,
535        published_blobs: Vec<Blob>,
536    ) -> Result<(Block, ChainInfoResponse), WorkerError> {
537        self.query_chain_worker(block.chain_id, move |callback| {
538            ChainWorkerRequest::StageBlockExecution {
539                block,
540                round,
541                published_blobs,
542                callback,
543            }
544        })
545        .await
546    }
547
548    /// Executes a [`Query`] for an application's state on a specific chain.
549    #[instrument(level = "trace", skip(self, chain_id, query))]
550    pub async fn query_application(
551        &self,
552        chain_id: ChainId,
553        query: Query,
554    ) -> Result<QueryOutcome, WorkerError> {
555        self.query_chain_worker(chain_id, move |callback| {
556            ChainWorkerRequest::QueryApplication { query, callback }
557        })
558        .await
559    }
560
561    #[instrument(level = "trace", skip(self, chain_id, application_id))]
562    pub async fn describe_application(
563        &self,
564        chain_id: ChainId,
565        application_id: ApplicationId,
566    ) -> Result<ApplicationDescription, WorkerError> {
567        self.query_chain_worker(chain_id, move |callback| {
568            ChainWorkerRequest::DescribeApplication {
569                application_id,
570                callback,
571            }
572        })
573        .await
574    }
575
576    /// Processes a confirmed block (aka a commit).
577    #[instrument(
578        level = "trace",
579        skip(self, certificate, notify_when_messages_are_delivered)
580    )]
581    async fn process_confirmed_block(
582        &self,
583        certificate: ConfirmedBlockCertificate,
584        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
585    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
586        let chain_id = certificate.block().header.chain_id;
587
588        let (response, actions) = self
589            .query_chain_worker(chain_id, move |callback| {
590                ChainWorkerRequest::ProcessConfirmedBlock {
591                    certificate,
592                    notify_when_messages_are_delivered,
593                    callback,
594                }
595            })
596            .await?;
597
598        #[cfg(with_metrics)]
599        metrics::NUM_BLOCKS.with_label_values(&[]).inc();
600
601        Ok((response, actions))
602    }
603
604    /// Processes a validated block issued from a multi-owner chain.
605    #[instrument(level = "trace", skip(self, certificate))]
606    async fn process_validated_block(
607        &self,
608        certificate: ValidatedBlockCertificate,
609    ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
610        let chain_id = certificate.block().header.chain_id;
611
612        self.query_chain_worker(chain_id, move |callback| {
613            ChainWorkerRequest::ProcessValidatedBlock {
614                certificate,
615                callback,
616            }
617        })
618        .await
619    }
620
621    /// Processes a leader timeout issued from a multi-owner chain.
622    #[instrument(level = "trace", skip(self, certificate))]
623    async fn process_timeout(
624        &self,
625        certificate: TimeoutCertificate,
626    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
627        let chain_id = certificate.value().chain_id();
628        self.query_chain_worker(chain_id, move |callback| {
629            ChainWorkerRequest::ProcessTimeout {
630                certificate,
631                callback,
632            }
633        })
634        .await
635    }
636
637    #[instrument(level = "trace", skip(self, origin, recipient, bundles))]
638    async fn process_cross_chain_update(
639        &self,
640        origin: ChainId,
641        recipient: ChainId,
642        bundles: Vec<(Epoch, MessageBundle)>,
643    ) -> Result<Option<BlockHeight>, WorkerError> {
644        self.query_chain_worker(recipient, move |callback| {
645            ChainWorkerRequest::ProcessCrossChainUpdate {
646                origin,
647                bundles,
648                callback,
649            }
650        })
651        .await
652    }
653
654    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
655    #[instrument(level = "trace", skip(self, chain_id, height))]
656    #[cfg(with_testing)]
657    pub async fn read_certificate(
658        &self,
659        chain_id: ChainId,
660        height: BlockHeight,
661    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
662        self.query_chain_worker(chain_id, move |callback| {
663            ChainWorkerRequest::ReadCertificate { height, callback }
664        })
665        .await
666    }
667
668    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
669    /// [`ChainId`].
670    ///
671    /// The returned view holds a lock on the chain state, which prevents the worker from changing
672    /// the state of that chain.
673    #[instrument(level = "trace", skip(self))]
674    pub async fn chain_state_view(
675        &self,
676        chain_id: ChainId,
677    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
678        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
679            callback,
680        })
681        .await
682    }
683
684    #[instrument(level = "trace", skip(self, request_builder))]
685    /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
686    async fn query_chain_worker<Response>(
687        &self,
688        chain_id: ChainId,
689        request_builder: impl FnOnce(
690            oneshot::Sender<Result<Response, WorkerError>>,
691        ) -> ChainWorkerRequest<StorageClient::Context>,
692    ) -> Result<Response, WorkerError> {
693        let chain_actor = self.get_chain_worker_endpoint(chain_id).await?;
694        let (callback, response) = oneshot::channel();
695
696        chain_actor
697            .send((request_builder(callback), tracing::Span::current()))
698            .expect("`ChainWorkerActor` stopped executing unexpectedly");
699
700        response
701            .await
702            .expect("`ChainWorkerActor` stopped executing without responding")
703    }
704
705    /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, creating one and adding it
706    /// to the cache if needed.
707    #[instrument(level = "trace", skip(self))]
708    async fn get_chain_worker_endpoint(
709        &self,
710        chain_id: ChainId,
711    ) -> Result<ChainActorEndpoint<StorageClient>, WorkerError> {
712        let (sender, new_receiver) = timeout(Duration::from_secs(3), async move {
713            loop {
714                match self.try_get_chain_worker_endpoint(chain_id) {
715                    Some(endpoint) => break endpoint,
716                    None => sleep(Duration::from_millis(250)).await,
717                }
718            }
719        })
720        .await
721        .map_err(|_| WorkerError::FullChainWorkerCache)?;
722
723        if let Some(receiver) = new_receiver {
724            let delivery_notifier = self
725                .delivery_notifiers
726                .lock()
727                .unwrap()
728                .entry(chain_id)
729                .or_default()
730                .clone();
731
732            let actor_task = ChainWorkerActor::run(
733                self.chain_worker_config.clone(),
734                self.storage.clone(),
735                self.block_cache.clone(),
736                self.execution_state_cache.clone(),
737                self.tracked_chains.clone(),
738                delivery_notifier,
739                chain_id,
740                receiver,
741            );
742
743            self.chain_worker_tasks
744                .lock()
745                .unwrap()
746                .spawn_task(actor_task);
747        }
748
749        Ok(sender)
750    }
751
752    /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, attempting to create one
753    /// and add it to the cache if needed.
754    ///
755    /// Returns [`None`] if the cache is full and no candidate for eviction was found.
756    #[instrument(level = "trace", skip(self))]
757    #[expect(clippy::type_complexity)]
758    fn try_get_chain_worker_endpoint(
759        &self,
760        chain_id: ChainId,
761    ) -> Option<(
762        ChainActorEndpoint<StorageClient>,
763        Option<
764            mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
765        >,
766    )> {
767        let mut chain_workers = self.chain_workers.lock().unwrap();
768
769        if let Some(endpoint) = chain_workers.get(&chain_id) {
770            Some((endpoint.clone(), None))
771        } else {
772            let (sender, receiver) = mpsc::unbounded_channel();
773            chain_workers.insert(chain_id, sender.clone());
774            Some((sender, Some(receiver)))
775        }
776    }
777
778    #[instrument(skip_all, fields(
779        nick = self.nickname,
780        chain_id = format!("{:.8}", proposal.content.block.chain_id),
781        height = %proposal.content.block.height,
782    ))]
783    pub async fn handle_block_proposal(
784        &self,
785        proposal: BlockProposal,
786    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
787        trace!("{} <-- {:?}", self.nickname, proposal);
788        #[cfg(with_metrics)]
789        let round = proposal.content.round;
790        let response = self
791            .query_chain_worker(proposal.content.block.chain_id, move |callback| {
792                ChainWorkerRequest::HandleBlockProposal { proposal, callback }
793            })
794            .await?;
795        #[cfg(with_metrics)]
796        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
797            .with_label_values(&[round.type_name()])
798            .observe(round.number() as f64);
799        Ok(response)
800    }
801
802    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
803    // Other fields will be included in handle_certificate's span.
804    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
805    pub async fn handle_lite_certificate<'a>(
806        &self,
807        certificate: LiteCertificate<'a>,
808        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
809    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
810        match self.full_certificate(certificate).await? {
811            Either::Left(confirmed) => {
812                self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
813                    .await
814            }
815            Either::Right(validated) => {
816                if let Some(notifier) = notify_when_messages_are_delivered {
817                    // Nothing to wait for.
818                    if let Err(()) = notifier.send(()) {
819                        warn!("Failed to notify message delivery to caller");
820                    }
821                }
822                self.handle_validated_certificate(validated).await
823            }
824        }
825    }
826
827    /// Processes a confirmed block certificate.
828    #[instrument(skip_all, fields(
829        nick = self.nickname,
830        chain_id = format!("{:.8}", certificate.block().header.chain_id),
831        height = %certificate.block().header.height,
832    ))]
833    pub async fn handle_confirmed_certificate(
834        &self,
835        certificate: ConfirmedBlockCertificate,
836        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
837    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
838        trace!("{} <-- {:?}", self.nickname, certificate);
839        #[cfg(with_metrics)]
840        {
841            let confirmed_transactions = (certificate.block().body.incoming_bundles.len()
842                + certificate.block().body.operations.len())
843                as u64;
844
845            metrics::NUM_ROUNDS_IN_CERTIFICATE
846                .with_label_values(&[
847                    certificate.inner().to_log_str(),
848                    certificate.round.type_name(),
849                ])
850                .observe(certificate.round.number() as f64);
851            if confirmed_transactions > 0 {
852                metrics::TRANSACTION_COUNT
853                    .with_label_values(&[])
854                    .inc_by(confirmed_transactions);
855            }
856
857            for (validator_name, _) in certificate.signatures() {
858                metrics::CERTIFICATES_SIGNED
859                    .with_label_values(&[&validator_name.to_string()])
860                    .inc();
861            }
862        }
863
864        self.process_confirmed_block(certificate, notify_when_messages_are_delivered)
865            .await
866    }
867
868    /// Processes a validated block certificate.
869    #[instrument(skip_all, fields(
870        nick = self.nickname,
871        chain_id = format!("{:.8}", certificate.block().header.chain_id),
872        height = %certificate.block().header.height,
873    ))]
874    pub async fn handle_validated_certificate(
875        &self,
876        certificate: ValidatedBlockCertificate,
877    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
878        trace!("{} <-- {:?}", self.nickname, certificate);
879
880        #[cfg(with_metrics)]
881        let round = certificate.round;
882        #[cfg(with_metrics)]
883        let cert_str = certificate.inner().to_log_str();
884
885        let (info, actions, _duplicated) = self.process_validated_block(certificate).await?;
886        #[cfg(with_metrics)]
887        {
888            if !_duplicated {
889                metrics::NUM_ROUNDS_IN_CERTIFICATE
890                    .with_label_values(&[cert_str, round.type_name()])
891                    .observe(round.number() as f64);
892            }
893        }
894        Ok((info, actions))
895    }
896
897    /// Processes a timeout certificate
898    #[instrument(skip_all, fields(
899        nick = self.nickname,
900        chain_id = format!("{:.8}", certificate.inner().chain_id()),
901        height = %certificate.inner().height(),
902    ))]
903    pub async fn handle_timeout_certificate(
904        &self,
905        certificate: TimeoutCertificate,
906    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
907        trace!("{} <-- {:?}", self.nickname, certificate);
908        self.process_timeout(certificate).await
909    }
910
911    #[instrument(skip_all, fields(
912        nick = self.nickname,
913        chain_id = format!("{:.8}", query.chain_id)
914    ))]
915    pub async fn handle_chain_info_query(
916        &self,
917        query: ChainInfoQuery,
918    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
919        trace!("{} <-- {:?}", self.nickname, query);
920        let result = self
921            .query_chain_worker(query.chain_id, move |callback| {
922                ChainWorkerRequest::HandleChainInfoQuery { query, callback }
923            })
924            .await;
925        trace!("{} --> {:?}", self.nickname, result);
926        result
927    }
928
929    #[instrument(skip_all, fields(
930        nick = self.nickname,
931        chain_id = format!("{:.8}", chain_id)
932    ))]
933    pub async fn download_pending_blob(
934        &self,
935        chain_id: ChainId,
936        blob_id: BlobId,
937    ) -> Result<Blob, WorkerError> {
938        trace!(
939            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
940            self.nickname
941        );
942        let result = self
943            .query_chain_worker(chain_id, move |callback| {
944                ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
945            })
946            .await;
947        trace!(
948            "{} --> {:?}",
949            self.nickname,
950            result.as_ref().map(|_| blob_id)
951        );
952        result
953    }
954
955    #[instrument(skip_all, fields(
956        nick = self.nickname,
957        chain_id = format!("{:.8}", chain_id)
958    ))]
959    pub async fn handle_pending_blob(
960        &self,
961        chain_id: ChainId,
962        blob: Blob,
963    ) -> Result<ChainInfoResponse, WorkerError> {
964        let blob_id = blob.id();
965        trace!(
966            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
967            self.nickname
968        );
969        let result = self
970            .query_chain_worker(chain_id, move |callback| {
971                ChainWorkerRequest::HandlePendingBlob { blob, callback }
972            })
973            .await;
974        trace!(
975            "{} --> {:?}",
976            self.nickname,
977            result.as_ref().map(|_| blob_id)
978        );
979        result
980    }
981
982    #[instrument(skip_all, fields(
983        nick = self.nickname,
984        chain_id = format!("{:.8}", request.target_chain_id())
985    ))]
986    pub async fn handle_cross_chain_request(
987        &self,
988        request: CrossChainRequest,
989    ) -> Result<NetworkActions, WorkerError> {
990        trace!("{} <-- {:?}", self.nickname, request);
991        match request {
992            CrossChainRequest::UpdateRecipient {
993                sender,
994                recipient,
995                bundles,
996            } => {
997                let mut actions = NetworkActions::default();
998                let origin = sender;
999                let Some(height) = self
1000                    .process_cross_chain_update(origin, recipient, bundles)
1001                    .await?
1002                else {
1003                    return Ok(actions);
1004                };
1005                actions.notifications.push(Notification {
1006                    chain_id: recipient,
1007                    reason: Reason::NewIncomingBundle { origin, height },
1008                });
1009                actions
1010                    .cross_chain_requests
1011                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1012                        sender,
1013                        recipient,
1014                        latest_height: height,
1015                    });
1016                Ok(actions)
1017            }
1018            CrossChainRequest::ConfirmUpdatedRecipient {
1019                sender,
1020                recipient,
1021                latest_height,
1022            } => {
1023                self.query_chain_worker(sender, move |callback| {
1024                    ChainWorkerRequest::ConfirmUpdatedRecipient {
1025                        recipient,
1026                        latest_height,
1027                        callback,
1028                    }
1029                })
1030                .await?;
1031                Ok(NetworkActions::default())
1032            }
1033        }
1034    }
1035
1036    /// Updates the received certificate trackers to at least the given values.
1037    pub async fn update_received_certificate_trackers(
1038        &self,
1039        chain_id: ChainId,
1040        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1041    ) -> Result<(), WorkerError> {
1042        self.query_chain_worker(chain_id, move |callback| {
1043            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1044                new_trackers,
1045                callback,
1046            }
1047        })
1048        .await
1049    }
1050}
1051
1052#[cfg(with_testing)]
1053impl<StorageClient> WorkerState<StorageClient>
1054where
1055    StorageClient: Storage,
1056{
1057    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1058    ///
1059    /// # Panics
1060    ///
1061    /// If the validator doesn't have a key pair assigned to it.
1062    #[instrument(level = "trace", skip(self))]
1063    pub fn public_key(&self) -> ValidatorPublicKey {
1064        self.chain_worker_config
1065            .key_pair()
1066            .expect(
1067                "Test validator should have a key pair assigned to it \
1068                in order to obtain it's public key",
1069            )
1070            .public()
1071    }
1072}