linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14    data_types::{
15        ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp,
16    },
17    doc_scalar,
18    hashed::Hashed,
19    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
20    time::Instant,
21    util::traits::DynError,
22};
23#[cfg(with_testing)]
24use linera_chain::ChainExecutionContext;
25use linera_chain::{
26    data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
27    types::{
28        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
29        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
30    },
31    ChainError, ChainStateView,
32};
33use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
34use linera_storage::Storage;
35use linera_views::{context::InactiveContext, ViewError};
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
39use tracing::{error, instrument, trace, warn};
40
41/// Re-export of [`EventSubscriptionsResult`] for use by other crate modules.
42pub(crate) use crate::chain_worker::EventSubscriptionsResult;
43use crate::{
44    chain_worker::{
45        BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
46    },
47    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
48    join_set_ext::{JoinSet, JoinSetExt},
49    notifier::Notifier,
50    value_cache::ValueCache,
51    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
52};
53
54#[cfg(test)]
55#[path = "unit_tests/worker_tests.rs"]
56mod worker_tests;
57
58#[cfg(with_metrics)]
59mod metrics {
60    use std::sync::LazyLock;
61
62    use linera_base::prometheus_util::{
63        exponential_bucket_interval, register_histogram_vec, register_int_counter,
64        register_int_counter_vec,
65    };
66    use prometheus::{HistogramVec, IntCounter, IntCounterVec};
67
68    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
69        register_histogram_vec(
70            "num_rounds_in_certificate",
71            "Number of rounds in certificate",
72            &["certificate_value", "round_type"],
73            exponential_bucket_interval(0.1, 50.0),
74        )
75    });
76
77    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
78        register_histogram_vec(
79            "num_rounds_in_block_proposal",
80            "Number of rounds in block proposal",
81            &["round_type"],
82            exponential_bucket_interval(0.1, 50.0),
83        )
84    });
85
86    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
87        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
88
89    pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
90        LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
91
92    pub static OPERATION_COUNT: LazyLock<IntCounter> =
93        LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
94
95    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
96        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
97    });
98
99    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
100        register_int_counter_vec(
101            "certificates_signed",
102            "Number of confirmed block certificates signed by each validator",
103            &["validator_name"],
104        )
105    });
106
107    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
108        register_int_counter(
109            "chain_info_queries",
110            "Number of chain info queries processed",
111        )
112    });
113}
114
115/// Instruct the networking layer to send cross-chain requests and/or push notifications.
116#[derive(Default, Debug)]
117pub struct NetworkActions {
118    /// The cross-chain requests
119    pub cross_chain_requests: Vec<CrossChainRequest>,
120    /// The push notifications.
121    pub notifications: Vec<Notification>,
122}
123
124impl NetworkActions {
125    pub fn extend(&mut self, other: NetworkActions) {
126        self.cross_chain_requests.extend(other.cross_chain_requests);
127        self.notifications.extend(other.notifications);
128    }
129}
130
131#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
132/// Notification that a chain has a new certified block or a new message.
133pub struct Notification {
134    pub chain_id: ChainId,
135    pub reason: Reason,
136}
137
138doc_scalar!(
139    Notification,
140    "Notify that a chain has a new certified block or a new message"
141);
142
143#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
144/// Reason for the notification.
145pub enum Reason {
146    NewBlock {
147        height: BlockHeight,
148        hash: CryptoHash,
149    },
150    NewEvents {
151        height: BlockHeight,
152        hash: CryptoHash,
153        event_streams: BTreeSet<StreamId>,
154    },
155    NewIncomingBundle {
156        origin: ChainId,
157        height: BlockHeight,
158    },
159    NewRound {
160        height: BlockHeight,
161        round: Round,
162    },
163    BlockExecuted {
164        height: BlockHeight,
165        hash: CryptoHash,
166    },
167}
168
169/// Error type for worker operations.
170#[derive(Debug, Error)]
171pub enum WorkerError {
172    #[error(transparent)]
173    CryptoError(#[from] CryptoError),
174
175    #[error(transparent)]
176    ArithmeticError(#[from] ArithmeticError),
177
178    #[error(transparent)]
179    ViewError(#[from] ViewError),
180
181    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
182    ReadCertificatesError(Vec<CryptoHash>),
183
184    #[error(transparent)]
185    ChainError(#[from] Box<ChainError>),
186
187    #[error(transparent)]
188    BcsError(#[from] bcs::Error),
189
190    // Chain access control
191    #[error("Block was not signed by an authorized owner")]
192    InvalidOwner,
193
194    #[error("Operations in the block are not authenticated by the proper owner: {0}")]
195    InvalidSigner(AccountOwner),
196
197    // Chaining
198    #[error(
199        "Chain is expecting a next block at height {expected_block_height} but the given block \
200        is at height {found_block_height} instead"
201    )]
202    UnexpectedBlockHeight {
203        expected_block_height: BlockHeight,
204        found_block_height: BlockHeight,
205    },
206    #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
207    InvalidEpoch {
208        chain_id: ChainId,
209        chain_epoch: Epoch,
210        epoch: Epoch,
211    },
212
213    #[error("Events not found: {0:?}")]
214    EventsNotFound(Vec<EventId>),
215
216    // Other server-side errors
217    #[error("Invalid cross-chain request")]
218    InvalidCrossChainRequest,
219    #[error("The block does not contain the hash that we expected for the previous block")]
220    InvalidBlockChaining,
221    #[error(
222        "The given outcome is not what we computed after executing the block.\n\
223        Computed: {computed:#?}\n\
224        Submitted: {submitted:#?}"
225    )]
226    IncorrectOutcome {
227        computed: Box<BlockExecutionOutcome>,
228        submitted: Box<BlockExecutionOutcome>,
229    },
230    #[error(
231        "Block timestamp ({block_timestamp}) is further in the future from local time \
232        ({local_time}) than block time grace period ({block_time_grace_period:?})"
233    )]
234    InvalidTimestamp {
235        block_timestamp: Timestamp,
236        local_time: Timestamp,
237        block_time_grace_period: Duration,
238    },
239    #[error("We don't have the value for the certificate.")]
240    MissingCertificateValue,
241    #[error("The hash certificate doesn't match its value.")]
242    InvalidLiteCertificate,
243    #[error("Fast blocks cannot query oracles")]
244    FastBlockUsingOracles,
245    #[error("Blobs not found: {0:?}")]
246    BlobsNotFound(Vec<BlobId>),
247    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
248    ConfirmedLogEntryNotFound {
249        height: BlockHeight,
250        chain_id: ChainId,
251    },
252    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
253    PreprocessedBlocksEntryNotFound {
254        height: BlockHeight,
255        chain_id: ChainId,
256    },
257    #[error("The block proposal is invalid: {0}")]
258    InvalidBlockProposal(String),
259    #[error("Blob was not required by any pending block")]
260    UnexpectedBlob,
261    #[error("Number of published blobs per block must not exceed {0}")]
262    TooManyPublishedBlobs(u64),
263    #[error("Missing network description")]
264    MissingNetworkDescription,
265    #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
266    ChainActorSendError {
267        chain_id: ChainId,
268        error: Box<dyn DynError>,
269    },
270    #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
271    ChainActorRecvError {
272        chain_id: ChainId,
273        error: Box<dyn DynError>,
274    },
275
276    #[error("thread error: {0}")]
277    Thread(#[from] web_thread_pool::Error),
278}
279
280impl WorkerError {
281    /// Returns whether this error is caused by an issue in the local node.
282    ///
283    /// Returns `false` whenever the error could be caused by a bad message from a peer.
284    pub fn is_local(&self) -> bool {
285        match self {
286            WorkerError::CryptoError(_)
287            | WorkerError::ArithmeticError(_)
288            | WorkerError::InvalidOwner
289            | WorkerError::InvalidSigner(_)
290            | WorkerError::UnexpectedBlockHeight { .. }
291            | WorkerError::InvalidEpoch { .. }
292            | WorkerError::EventsNotFound(_)
293            | WorkerError::InvalidBlockChaining
294            | WorkerError::IncorrectOutcome { .. }
295            | WorkerError::InvalidTimestamp { .. }
296            | WorkerError::MissingCertificateValue
297            | WorkerError::InvalidLiteCertificate
298            | WorkerError::FastBlockUsingOracles
299            | WorkerError::BlobsNotFound(_)
300            | WorkerError::InvalidBlockProposal(_)
301            | WorkerError::UnexpectedBlob
302            | WorkerError::TooManyPublishedBlobs(_)
303            | WorkerError::ViewError(ViewError::NotFound(_)) => false,
304            WorkerError::BcsError(_)
305            | WorkerError::InvalidCrossChainRequest
306            | WorkerError::ViewError(_)
307            | WorkerError::ConfirmedLogEntryNotFound { .. }
308            | WorkerError::PreprocessedBlocksEntryNotFound { .. }
309            | WorkerError::MissingNetworkDescription
310            | WorkerError::ChainActorSendError { .. }
311            | WorkerError::ChainActorRecvError { .. }
312            | WorkerError::Thread(_)
313            | WorkerError::ReadCertificatesError(_) => true,
314            WorkerError::ChainError(chain_error) => chain_error.is_local(),
315        }
316    }
317}
318
319impl From<ChainError> for WorkerError {
320    #[instrument(level = "trace", skip(chain_error))]
321    fn from(chain_error: ChainError) -> Self {
322        match chain_error {
323            ChainError::ExecutionError(execution_error, context) => match *execution_error {
324                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
325                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
326                _ => Self::ChainError(Box::new(ChainError::ExecutionError(
327                    execution_error,
328                    context,
329                ))),
330            },
331            error => Self::ChainError(Box::new(error)),
332        }
333    }
334}
335
336#[cfg(with_testing)]
337impl WorkerError {
338    /// Returns the inner [`ExecutionError`] in this error.
339    ///
340    /// # Panics
341    ///
342    /// If this is not caused by an [`ExecutionError`].
343    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
344        let WorkerError::ChainError(chain_error) = self else {
345            panic!("Expected an `ExecutionError`. Got: {self:#?}");
346        };
347
348        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
349            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
350        };
351
352        assert_eq!(context, expected_context);
353
354        *execution_error
355    }
356}
357
358/// State of a worker in a validator or a local node.
359pub struct WorkerState<StorageClient>
360where
361    StorageClient: Storage,
362{
363    /// A name used for logging
364    nickname: String,
365    /// Access to local persistent storage.
366    storage: StorageClient,
367    /// Configuration options for the [`ChainWorker`]s.
368    chain_worker_config: ChainWorkerConfig,
369    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
370    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
371    /// Chain IDs that should be tracked by a worker.
372    tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
373    /// One-shot channels to notify callers when messages of a particular chain have been
374    /// delivered.
375    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
376    /// The set of spawned [`ChainWorkerActor`] tasks.
377    chain_worker_tasks: Arc<Mutex<JoinSet>>,
378    /// The cache of running [`ChainWorkerActor`]s.
379    chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
380}
381
382impl<StorageClient> Clone for WorkerState<StorageClient>
383where
384    StorageClient: Storage + Clone,
385{
386    fn clone(&self) -> Self {
387        WorkerState {
388            nickname: self.nickname.clone(),
389            storage: self.storage.clone(),
390            chain_worker_config: self.chain_worker_config.clone(),
391            block_cache: self.block_cache.clone(),
392            execution_state_cache: self.execution_state_cache.clone(),
393            tracked_chains: self.tracked_chains.clone(),
394            delivery_notifiers: self.delivery_notifiers.clone(),
395            chain_worker_tasks: self.chain_worker_tasks.clone(),
396            chain_workers: self.chain_workers.clone(),
397        }
398    }
399}
400
401/// The sender endpoint for [`ChainWorkerRequest`]s.
402type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
403    ChainWorkerRequest<<StorageClient as Storage>::Context>,
404    tracing::Span,
405    Instant,
406)>;
407
408pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
409
410impl<StorageClient> WorkerState<StorageClient>
411where
412    StorageClient: Storage,
413{
414    #[instrument(level = "trace", skip(nickname, key_pair, storage))]
415    pub fn new(
416        nickname: String,
417        key_pair: Option<ValidatorSecretKey>,
418        storage: StorageClient,
419        block_cache_size: usize,
420        execution_state_cache_size: usize,
421    ) -> Self {
422        WorkerState {
423            nickname,
424            storage,
425            chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
426            block_cache: Arc::new(ValueCache::new(block_cache_size)),
427            execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
428            tracked_chains: None,
429            delivery_notifiers: Arc::default(),
430            chain_worker_tasks: Arc::default(),
431            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
432        }
433    }
434
435    #[instrument(level = "trace", skip(nickname, storage))]
436    pub fn new_for_client(
437        nickname: String,
438        storage: StorageClient,
439        tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
440        block_cache_size: usize,
441        execution_state_cache_size: usize,
442    ) -> Self {
443        WorkerState {
444            nickname,
445            storage,
446            chain_worker_config: ChainWorkerConfig::default(),
447            block_cache: Arc::new(ValueCache::new(block_cache_size)),
448            execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
449            tracked_chains: Some(tracked_chains),
450            delivery_notifiers: Arc::default(),
451            chain_worker_tasks: Arc::default(),
452            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
453        }
454    }
455
456    #[instrument(level = "trace", skip(self, value))]
457    pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
458        self.chain_worker_config.allow_inactive_chains = value;
459        self
460    }
461
462    #[instrument(level = "trace", skip(self, value))]
463    pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
464        self.chain_worker_config
465            .allow_messages_from_deprecated_epochs = value;
466        self
467    }
468
469    #[instrument(level = "trace", skip(self, value))]
470    pub fn with_long_lived_services(mut self, value: bool) -> Self {
471        self.chain_worker_config.long_lived_services = value;
472        self
473    }
474
475    /// Returns an instance with the specified block time grace period.
476    ///
477    /// Blocks with a timestamp this far in the future will still be accepted, but the validator
478    /// will wait until that timestamp before voting.
479    #[instrument(level = "trace", skip(self))]
480    pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self {
481        self.chain_worker_config.block_time_grace_period = block_time_grace_period;
482        self
483    }
484
485    /// Returns an instance with the specified chain worker TTL.
486    ///
487    /// Idle chain workers free their memory after that duration without requests.
488    #[instrument(level = "trace", skip(self))]
489    pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
490        self.chain_worker_config.ttl = chain_worker_ttl;
491        self
492    }
493
494    /// Returns an instance with the specified sender chain worker TTL.
495    ///
496    /// Idle sender chain workers free their memory after that duration without requests.
497    #[instrument(level = "trace", skip(self))]
498    pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
499        self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
500        self
501    }
502
503    /// Returns an instance with the specified maximum size for received_log entries.
504    ///
505    /// Sizes below `CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES` should be avoided.
506    #[instrument(level = "trace", skip(self))]
507    pub fn with_chain_info_max_received_log_entries(
508        mut self,
509        chain_info_max_received_log_entries: usize,
510    ) -> Self {
511        if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
512            warn!(
513                "The value set for the maximum size of received_log entries \
514                   may not be compatible with the latest clients: {} instead of {}",
515                chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
516            );
517        }
518        self.chain_worker_config.chain_info_max_received_log_entries =
519            chain_info_max_received_log_entries;
520        self
521    }
522
523    #[instrument(level = "trace", skip(self))]
524    pub fn nickname(&self) -> &str {
525        &self.nickname
526    }
527
528    /// Returns the storage client so that it can be manipulated or queried.
529    #[instrument(level = "trace", skip(self))]
530    #[cfg(not(feature = "test"))]
531    pub(crate) fn storage_client(&self) -> &StorageClient {
532        &self.storage
533    }
534
535    /// Returns the storage client so that it can be manipulated or queried by tests in other
536    /// crates.
537    #[instrument(level = "trace", skip(self))]
538    #[cfg(feature = "test")]
539    pub fn storage_client(&self) -> &StorageClient {
540        &self.storage
541    }
542
543    #[instrument(level = "trace", skip(self, certificate))]
544    pub(crate) async fn full_certificate(
545        &self,
546        certificate: LiteCertificate<'_>,
547    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
548        let block = self
549            .block_cache
550            .get(&certificate.value.value_hash)
551            .ok_or(WorkerError::MissingCertificateValue)?;
552
553        match certificate.value.kind {
554            linera_chain::types::CertificateKind::Confirmed => {
555                let value = ConfirmedBlock::from_hashed(block);
556                Ok(Either::Left(
557                    certificate
558                        .with_value(value)
559                        .ok_or(WorkerError::InvalidLiteCertificate)?,
560                ))
561            }
562            linera_chain::types::CertificateKind::Validated => {
563                let value = ValidatedBlock::from_hashed(block);
564                Ok(Either::Right(
565                    certificate
566                        .with_value(value)
567                        .ok_or(WorkerError::InvalidLiteCertificate)?,
568                ))
569            }
570            _ => Err(WorkerError::InvalidLiteCertificate),
571        }
572    }
573}
574
575#[allow(async_fn_in_trait)]
576#[cfg_attr(not(web), trait_variant::make(Send))]
577pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
578    async fn process_certificate<S: Storage + Clone + 'static>(
579        worker: &WorkerState<S>,
580        certificate: GenericCertificate<Self>,
581    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
582}
583
584impl ProcessableCertificate for ConfirmedBlock {
585    async fn process_certificate<S: Storage + Clone + 'static>(
586        worker: &WorkerState<S>,
587        certificate: ConfirmedBlockCertificate,
588    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
589        Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
590    }
591}
592
593impl ProcessableCertificate for ValidatedBlock {
594    async fn process_certificate<S: Storage + Clone + 'static>(
595        worker: &WorkerState<S>,
596        certificate: ValidatedBlockCertificate,
597    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
598        Box::pin(worker.handle_validated_certificate(certificate)).await
599    }
600}
601
602impl ProcessableCertificate for Timeout {
603    async fn process_certificate<S: Storage + Clone + 'static>(
604        worker: &WorkerState<S>,
605        certificate: TimeoutCertificate,
606    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
607        worker.handle_timeout_certificate(certificate).await
608    }
609}
610
611impl<StorageClient> WorkerState<StorageClient>
612where
613    StorageClient: Storage + Clone + 'static,
614{
615    #[instrument(level = "trace", skip(self, certificate, notifier))]
616    #[inline]
617    pub async fn fully_handle_certificate_with_notifications<T>(
618        &self,
619        certificate: GenericCertificate<T>,
620        notifier: &impl Notifier,
621    ) -> Result<ChainInfoResponse, WorkerError>
622    where
623        T: ProcessableCertificate,
624    {
625        let notifications = (*notifier).clone();
626        let this = self.clone();
627        linera_base::task::spawn(async move {
628            let (response, actions) =
629                ProcessableCertificate::process_certificate(&this, certificate).await?;
630            notifications.notify(&actions.notifications);
631            let mut requests = VecDeque::from(actions.cross_chain_requests);
632            while let Some(request) = requests.pop_front() {
633                let actions = this.handle_cross_chain_request(request).await?;
634                requests.extend(actions.cross_chain_requests);
635                notifications.notify(&actions.notifications);
636            }
637            Ok(response)
638        })
639        .await
640    }
641
642    /// Tries to execute a block proposal without any verification other than block execution.
643    #[instrument(level = "trace", skip(self, block))]
644    pub async fn stage_block_execution(
645        &self,
646        block: ProposedBlock,
647        round: Option<u32>,
648        published_blobs: Vec<Blob>,
649    ) -> Result<(Block, ChainInfoResponse), WorkerError> {
650        self.query_chain_worker(block.chain_id, move |callback| {
651            ChainWorkerRequest::StageBlockExecution {
652                block,
653                round,
654                published_blobs,
655                callback,
656            }
657        })
658        .await
659    }
660
661    /// Executes a [`Query`] for an application's state on a specific chain.
662    ///
663    /// If `block_hash` is specified, system will query the application's state
664    /// at that block. If it doesn't exist, it uses latest state.
665    #[instrument(level = "trace", skip(self, chain_id, query))]
666    pub async fn query_application(
667        &self,
668        chain_id: ChainId,
669        query: Query,
670        block_hash: Option<CryptoHash>,
671    ) -> Result<QueryOutcome, WorkerError> {
672        self.query_chain_worker(chain_id, move |callback| {
673            ChainWorkerRequest::QueryApplication {
674                query,
675                block_hash,
676                callback,
677            }
678        })
679        .await
680    }
681
682    #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
683        nickname = %self.nickname,
684        chain_id = %chain_id,
685        application_id = %application_id
686    ))]
687    pub async fn describe_application(
688        &self,
689        chain_id: ChainId,
690        application_id: ApplicationId,
691    ) -> Result<ApplicationDescription, WorkerError> {
692        self.query_chain_worker(chain_id, move |callback| {
693            ChainWorkerRequest::DescribeApplication {
694                application_id,
695                callback,
696            }
697        })
698        .await
699    }
700
701    /// Processes a confirmed block (aka a commit).
702    #[instrument(
703        level = "trace",
704        skip(self, certificate, notify_when_messages_are_delivered),
705        fields(
706            nickname = %self.nickname,
707            chain_id = %certificate.block().header.chain_id,
708            block_height = %certificate.block().header.height
709        )
710    )]
711    async fn process_confirmed_block(
712        &self,
713        certificate: ConfirmedBlockCertificate,
714        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
715    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
716        let chain_id = certificate.block().header.chain_id;
717        self.query_chain_worker(chain_id, move |callback| {
718            ChainWorkerRequest::ProcessConfirmedBlock {
719                certificate,
720                notify_when_messages_are_delivered,
721                callback,
722            }
723        })
724        .await
725    }
726
727    /// Processes a validated block issued from a multi-owner chain.
728    #[instrument(level = "trace", skip(self, certificate), fields(
729        nickname = %self.nickname,
730        chain_id = %certificate.block().header.chain_id,
731        block_height = %certificate.block().header.height
732    ))]
733    async fn process_validated_block(
734        &self,
735        certificate: ValidatedBlockCertificate,
736    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
737        let chain_id = certificate.block().header.chain_id;
738        self.query_chain_worker(chain_id, move |callback| {
739            ChainWorkerRequest::ProcessValidatedBlock {
740                certificate,
741                callback,
742            }
743        })
744        .await
745    }
746
747    /// Processes a leader timeout issued from a multi-owner chain.
748    #[instrument(level = "trace", skip(self, certificate), fields(
749        nickname = %self.nickname,
750        chain_id = %certificate.value().chain_id(),
751        height = %certificate.value().height()
752    ))]
753    async fn process_timeout(
754        &self,
755        certificate: TimeoutCertificate,
756    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
757        let chain_id = certificate.value().chain_id();
758        self.query_chain_worker(chain_id, move |callback| {
759            ChainWorkerRequest::ProcessTimeout {
760                certificate,
761                callback,
762            }
763        })
764        .await
765    }
766
767    #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
768        nickname = %self.nickname,
769        origin = %origin,
770        recipient = %recipient,
771        num_bundles = %bundles.len()
772    ))]
773    async fn process_cross_chain_update(
774        &self,
775        origin: ChainId,
776        recipient: ChainId,
777        bundles: Vec<(Epoch, MessageBundle)>,
778    ) -> Result<Option<BlockHeight>, WorkerError> {
779        self.query_chain_worker(recipient, move |callback| {
780            ChainWorkerRequest::ProcessCrossChainUpdate {
781                origin,
782                bundles,
783                callback,
784            }
785        })
786        .await
787    }
788
789    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
790    #[instrument(level = "trace", skip(self, chain_id, height), fields(
791        nickname = %self.nickname,
792        chain_id = %chain_id,
793        height = %height
794    ))]
795    #[cfg(with_testing)]
796    pub async fn read_certificate(
797        &self,
798        chain_id: ChainId,
799        height: BlockHeight,
800    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
801        self.query_chain_worker(chain_id, move |callback| {
802            ChainWorkerRequest::ReadCertificate { height, callback }
803        })
804        .await
805    }
806
807    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
808    /// [`ChainId`].
809    ///
810    /// The returned view holds a lock on the chain state, which prevents the worker from changing
811    /// the state of that chain.
812    #[instrument(level = "trace", skip(self), fields(
813        nickname = %self.nickname,
814        chain_id = %chain_id
815    ))]
816    pub async fn chain_state_view(
817        &self,
818        chain_id: ChainId,
819    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
820        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
821            callback,
822        })
823        .await
824    }
825
826    /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
827    #[instrument(level = "trace", skip(self, request_builder), fields(
828        nickname = %self.nickname,
829        chain_id = %chain_id
830    ))]
831    async fn query_chain_worker<Response>(
832        &self,
833        chain_id: ChainId,
834        request_builder: impl FnOnce(
835            oneshot::Sender<Result<Response, WorkerError>>,
836        ) -> ChainWorkerRequest<StorageClient::Context>,
837    ) -> Result<Response, WorkerError> {
838        // Build the request.
839        let (callback, response) = oneshot::channel();
840        let request = request_builder(callback);
841
842        // Call the endpoint, possibly a new one.
843        let new_receiver = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
844
845        // We just created an endpoint: spawn the actor.
846        if let Some(receiver) = new_receiver {
847            let delivery_notifier = self
848                .delivery_notifiers
849                .lock()
850                .unwrap()
851                .entry(chain_id)
852                .or_default()
853                .clone();
854
855            let is_tracked = self
856                .tracked_chains
857                .as_ref()
858                .is_some_and(|tracked_chains| tracked_chains.read().unwrap().contains(&chain_id));
859
860            let actor_task = ChainWorkerActor::run(
861                self.chain_worker_config.clone(),
862                self.storage.clone(),
863                self.block_cache.clone(),
864                self.execution_state_cache.clone(),
865                self.tracked_chains.clone(),
866                delivery_notifier,
867                chain_id,
868                receiver,
869                is_tracked,
870            );
871
872            self.chain_worker_tasks
873                .lock()
874                .unwrap()
875                .spawn_task(actor_task);
876        }
877
878        // Finally, wait a response.
879        match response.await {
880            Err(e) => {
881                // The actor endpoint was dropped. Better luck next time!
882                Err(WorkerError::ChainActorRecvError {
883                    chain_id,
884                    error: Box::new(e),
885                })
886            }
887            Ok(response) => response,
888        }
889    }
890
891    /// Find an endpoint and call it. Create the endpoint if necessary.
892    #[instrument(level = "trace", skip(self), fields(
893        nickname = %self.nickname,
894        chain_id = %chain_id
895    ))]
896    #[expect(clippy::type_complexity)]
897    fn call_and_maybe_create_chain_worker_endpoint(
898        &self,
899        chain_id: ChainId,
900        request: ChainWorkerRequest<StorageClient::Context>,
901    ) -> Result<
902        Option<
903            mpsc::UnboundedReceiver<(
904                ChainWorkerRequest<StorageClient::Context>,
905                tracing::Span,
906                Instant,
907            )>,
908        >,
909        WorkerError,
910    > {
911        let mut chain_workers = self.chain_workers.lock().unwrap();
912
913        let (sender, new_receiver) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
914            (endpoint, None)
915        } else {
916            let (sender, receiver) = mpsc::unbounded_channel();
917            (sender, Some(receiver))
918        };
919
920        if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) {
921            // The actor was dropped. Give up without (re-)inserting the endpoint in the cache.
922            return Err(WorkerError::ChainActorSendError {
923                chain_id,
924                error: Box::new(e),
925            });
926        }
927
928        // Put back the sender in the cache for next time.
929        chain_workers.insert(chain_id, sender);
930
931        Ok(new_receiver)
932    }
933
934    #[instrument(skip_all, fields(
935        nick = self.nickname,
936        chain_id = format!("{:.8}", proposal.content.block.chain_id),
937        height = %proposal.content.block.height,
938    ))]
939    pub async fn handle_block_proposal(
940        &self,
941        proposal: BlockProposal,
942    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
943        trace!("{} <-- {:?}", self.nickname, proposal);
944        #[cfg(with_metrics)]
945        let round = proposal.content.round;
946        let response = self
947            .query_chain_worker(proposal.content.block.chain_id, move |callback| {
948                ChainWorkerRequest::HandleBlockProposal { proposal, callback }
949            })
950            .await?;
951        #[cfg(with_metrics)]
952        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
953            .with_label_values(&[round.type_name()])
954            .observe(round.number() as f64);
955        Ok(response)
956    }
957
958    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
959    // Other fields will be included in handle_certificate's span.
960    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
961    pub async fn handle_lite_certificate(
962        &self,
963        certificate: LiteCertificate<'_>,
964        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
965    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
966        match self.full_certificate(certificate).await? {
967            Either::Left(confirmed) => {
968                Box::pin(
969                    self.handle_confirmed_certificate(
970                        confirmed,
971                        notify_when_messages_are_delivered,
972                    ),
973                )
974                .await
975            }
976            Either::Right(validated) => {
977                if let Some(notifier) = notify_when_messages_are_delivered {
978                    // Nothing to wait for.
979                    if let Err(()) = notifier.send(()) {
980                        warn!("Failed to notify message delivery to caller");
981                    }
982                }
983                Box::pin(self.handle_validated_certificate(validated)).await
984            }
985        }
986    }
987
988    /// Processes a confirmed block certificate.
989    #[instrument(skip_all, fields(
990        nick = self.nickname,
991        chain_id = format!("{:.8}", certificate.block().header.chain_id),
992        height = %certificate.block().header.height,
993    ))]
994    pub async fn handle_confirmed_certificate(
995        &self,
996        certificate: ConfirmedBlockCertificate,
997        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
998    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
999        trace!("{} <-- {:?}", self.nickname, certificate);
1000        #[cfg(with_metrics)]
1001        let metrics_data = (
1002            certificate.inner().to_log_str(),
1003            certificate.round.type_name(),
1004            certificate.round.number(),
1005            certificate.block().body.transactions.len() as u64,
1006            certificate.block().body.incoming_bundles().count() as u64,
1007            certificate.block().body.operations().count() as u64,
1008            certificate
1009                .signatures()
1010                .iter()
1011                .map(|(validator_name, _)| validator_name.to_string())
1012                .collect::<Vec<_>>(),
1013        );
1014
1015        let (info, actions, _outcome) =
1016            Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1017                .await?;
1018
1019        #[cfg(with_metrics)]
1020        {
1021            if matches!(_outcome, BlockOutcome::Processed) {
1022                let (
1023                    certificate_log_str,
1024                    round_type,
1025                    round_number,
1026                    confirmed_transactions,
1027                    confirmed_incoming_bundles,
1028                    confirmed_operations,
1029                    validators_with_signatures,
1030                ) = metrics_data;
1031                metrics::NUM_BLOCKS.with_label_values(&[]).inc();
1032                metrics::NUM_ROUNDS_IN_CERTIFICATE
1033                    .with_label_values(&[certificate_log_str, round_type])
1034                    .observe(round_number as f64);
1035                if confirmed_transactions > 0 {
1036                    metrics::TRANSACTION_COUNT
1037                        .with_label_values(&[])
1038                        .inc_by(confirmed_transactions);
1039                    if confirmed_incoming_bundles > 0 {
1040                        metrics::INCOMING_BUNDLE_COUNT.inc_by(confirmed_incoming_bundles);
1041                    }
1042                    if confirmed_operations > 0 {
1043                        metrics::OPERATION_COUNT.inc_by(confirmed_operations);
1044                    }
1045                }
1046
1047                for validator_name in validators_with_signatures {
1048                    metrics::CERTIFICATES_SIGNED
1049                        .with_label_values(&[&validator_name])
1050                        .inc();
1051                }
1052            }
1053        }
1054        Ok((info, actions))
1055    }
1056
1057    /// Processes a validated block certificate.
1058    #[instrument(skip_all, fields(
1059        nick = self.nickname,
1060        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1061        height = %certificate.block().header.height,
1062    ))]
1063    pub async fn handle_validated_certificate(
1064        &self,
1065        certificate: ValidatedBlockCertificate,
1066    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1067        trace!("{} <-- {:?}", self.nickname, certificate);
1068
1069        #[cfg(with_metrics)]
1070        let round = certificate.round;
1071        #[cfg(with_metrics)]
1072        let cert_str = certificate.inner().to_log_str();
1073
1074        let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1075        #[cfg(with_metrics)]
1076        {
1077            if matches!(_outcome, BlockOutcome::Processed) {
1078                metrics::NUM_ROUNDS_IN_CERTIFICATE
1079                    .with_label_values(&[cert_str, round.type_name()])
1080                    .observe(round.number() as f64);
1081            }
1082        }
1083        Ok((info, actions))
1084    }
1085
1086    /// Processes a timeout certificate
1087    #[instrument(skip_all, fields(
1088        nick = self.nickname,
1089        chain_id = format!("{:.8}", certificate.inner().chain_id()),
1090        height = %certificate.inner().height(),
1091    ))]
1092    pub async fn handle_timeout_certificate(
1093        &self,
1094        certificate: TimeoutCertificate,
1095    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1096        trace!("{} <-- {:?}", self.nickname, certificate);
1097        self.process_timeout(certificate).await
1098    }
1099
1100    #[instrument(skip_all, fields(
1101        nick = self.nickname,
1102        chain_id = format!("{:.8}", query.chain_id)
1103    ))]
1104    pub async fn handle_chain_info_query(
1105        &self,
1106        query: ChainInfoQuery,
1107    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1108        trace!("{} <-- {:?}", self.nickname, query);
1109        #[cfg(with_metrics)]
1110        metrics::CHAIN_INFO_QUERIES.inc();
1111        let result = self
1112            .query_chain_worker(query.chain_id, move |callback| {
1113                ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1114            })
1115            .await;
1116        trace!("{} --> {:?}", self.nickname, result);
1117        result
1118    }
1119
1120    #[instrument(skip_all, fields(
1121        nick = self.nickname,
1122        chain_id = format!("{:.8}", chain_id)
1123    ))]
1124    pub async fn download_pending_blob(
1125        &self,
1126        chain_id: ChainId,
1127        blob_id: BlobId,
1128    ) -> Result<Blob, WorkerError> {
1129        trace!(
1130            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1131            self.nickname
1132        );
1133        let result = self
1134            .query_chain_worker(chain_id, move |callback| {
1135                ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1136            })
1137            .await;
1138        trace!(
1139            "{} --> {:?}",
1140            self.nickname,
1141            result.as_ref().map(|_| blob_id)
1142        );
1143        result
1144    }
1145
1146    #[instrument(skip_all, fields(
1147        nick = self.nickname,
1148        chain_id = format!("{:.8}", chain_id)
1149    ))]
1150    pub async fn handle_pending_blob(
1151        &self,
1152        chain_id: ChainId,
1153        blob: Blob,
1154    ) -> Result<ChainInfoResponse, WorkerError> {
1155        let blob_id = blob.id();
1156        trace!(
1157            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1158            self.nickname
1159        );
1160        let result = self
1161            .query_chain_worker(chain_id, move |callback| {
1162                ChainWorkerRequest::HandlePendingBlob { blob, callback }
1163            })
1164            .await;
1165        trace!(
1166            "{} --> {:?}",
1167            self.nickname,
1168            result.as_ref().map(|_| blob_id)
1169        );
1170        result
1171    }
1172
1173    #[instrument(skip_all, fields(
1174        nick = self.nickname,
1175        chain_id = format!("{:.8}", request.target_chain_id())
1176    ))]
1177    pub async fn handle_cross_chain_request(
1178        &self,
1179        request: CrossChainRequest,
1180    ) -> Result<NetworkActions, WorkerError> {
1181        trace!("{} <-- {:?}", self.nickname, request);
1182        match request {
1183            CrossChainRequest::UpdateRecipient {
1184                sender,
1185                recipient,
1186                bundles,
1187            } => {
1188                let mut actions = NetworkActions::default();
1189                let origin = sender;
1190                let Some(height) = self
1191                    .process_cross_chain_update(origin, recipient, bundles)
1192                    .await?
1193                else {
1194                    return Ok(actions);
1195                };
1196                actions.notifications.push(Notification {
1197                    chain_id: recipient,
1198                    reason: Reason::NewIncomingBundle { origin, height },
1199                });
1200                actions
1201                    .cross_chain_requests
1202                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1203                        sender,
1204                        recipient,
1205                        latest_height: height,
1206                    });
1207                Ok(actions)
1208            }
1209            CrossChainRequest::ConfirmUpdatedRecipient {
1210                sender,
1211                recipient,
1212                latest_height,
1213            } => {
1214                self.query_chain_worker(sender, move |callback| {
1215                    ChainWorkerRequest::ConfirmUpdatedRecipient {
1216                        recipient,
1217                        latest_height,
1218                        callback,
1219                    }
1220                })
1221                .await?;
1222                Ok(NetworkActions::default())
1223            }
1224        }
1225    }
1226
1227    /// Updates the received certificate trackers to at least the given values.
1228    #[instrument(skip_all, fields(
1229        nickname = %self.nickname,
1230        chain_id = %chain_id,
1231        num_trackers = %new_trackers.len()
1232    ))]
1233    pub async fn update_received_certificate_trackers(
1234        &self,
1235        chain_id: ChainId,
1236        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1237    ) -> Result<(), WorkerError> {
1238        self.query_chain_worker(chain_id, move |callback| {
1239            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1240                new_trackers,
1241                callback,
1242            }
1243        })
1244        .await
1245    }
1246
1247    /// Gets preprocessed block hashes in a given height range.
1248    #[instrument(skip_all, fields(
1249        nickname = %self.nickname,
1250        chain_id = %chain_id,
1251        start = %start,
1252        end = %end
1253    ))]
1254    pub async fn get_preprocessed_block_hashes(
1255        &self,
1256        chain_id: ChainId,
1257        start: BlockHeight,
1258        end: BlockHeight,
1259    ) -> Result<Vec<CryptoHash>, WorkerError> {
1260        self.query_chain_worker(chain_id, move |callback| {
1261            ChainWorkerRequest::GetPreprocessedBlockHashes {
1262                start,
1263                end,
1264                callback,
1265            }
1266        })
1267        .await
1268    }
1269
1270    /// Gets the next block height to receive from an inbox.
1271    #[instrument(skip_all, fields(
1272        nickname = %self.nickname,
1273        chain_id = %chain_id,
1274        origin = %origin
1275    ))]
1276    pub async fn get_inbox_next_height(
1277        &self,
1278        chain_id: ChainId,
1279        origin: ChainId,
1280    ) -> Result<BlockHeight, WorkerError> {
1281        self.query_chain_worker(chain_id, move |callback| {
1282            ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1283        })
1284        .await
1285    }
1286
1287    /// Gets locking blobs for specific blob IDs.
1288    /// Returns `Ok(None)` if any of the blobs is not found.
1289    #[instrument(skip_all, fields(
1290        nickname = %self.nickname,
1291        chain_id = %chain_id,
1292        num_blob_ids = %blob_ids.len()
1293    ))]
1294    pub async fn get_locking_blobs(
1295        &self,
1296        chain_id: ChainId,
1297        blob_ids: Vec<BlobId>,
1298    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1299        self.query_chain_worker(chain_id, move |callback| {
1300            ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1301        })
1302        .await
1303    }
1304
1305    /// Gets block hashes for the given heights.
1306    pub async fn get_block_hashes(
1307        &self,
1308        chain_id: ChainId,
1309        heights: Vec<BlockHeight>,
1310    ) -> Result<Vec<CryptoHash>, WorkerError> {
1311        self.query_chain_worker(chain_id, move |callback| {
1312            ChainWorkerRequest::GetBlockHashes { heights, callback }
1313        })
1314        .await
1315    }
1316
1317    /// Gets proposed blobs from the manager for specified blob IDs.
1318    pub async fn get_proposed_blobs(
1319        &self,
1320        chain_id: ChainId,
1321        blob_ids: Vec<BlobId>,
1322    ) -> Result<Vec<Blob>, WorkerError> {
1323        self.query_chain_worker(chain_id, move |callback| {
1324            ChainWorkerRequest::GetProposedBlobs { blob_ids, callback }
1325        })
1326        .await
1327    }
1328
1329    /// Gets event subscriptions from the chain.
1330    pub async fn get_event_subscriptions(
1331        &self,
1332        chain_id: ChainId,
1333    ) -> Result<EventSubscriptionsResult, WorkerError> {
1334        self.query_chain_worker(chain_id, |callback| {
1335            ChainWorkerRequest::GetEventSubscriptions { callback }
1336        })
1337        .await
1338    }
1339
1340    /// Gets the next expected event index for a stream.
1341    pub async fn get_next_expected_event(
1342        &self,
1343        chain_id: ChainId,
1344        stream_id: StreamId,
1345    ) -> Result<Option<u32>, WorkerError> {
1346        self.query_chain_worker(chain_id, move |callback| {
1347            ChainWorkerRequest::GetNextExpectedEvent {
1348                stream_id,
1349                callback,
1350            }
1351        })
1352        .await
1353    }
1354
1355    /// Gets received certificate trackers.
1356    pub async fn get_received_certificate_trackers(
1357        &self,
1358        chain_id: ChainId,
1359    ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1360        self.query_chain_worker(chain_id, |callback| {
1361            ChainWorkerRequest::GetReceivedCertificateTrackers { callback }
1362        })
1363        .await
1364    }
1365
1366    /// Gets tip state and outbox info for next_outbox_heights calculation.
1367    pub async fn get_tip_state_and_outbox_info(
1368        &self,
1369        chain_id: ChainId,
1370        receiver_id: ChainId,
1371    ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1372        self.query_chain_worker(chain_id, move |callback| {
1373            ChainWorkerRequest::GetTipStateAndOutboxInfo {
1374                receiver_id,
1375                callback,
1376            }
1377        })
1378        .await
1379    }
1380
1381    /// Gets the next height to preprocess.
1382    pub async fn get_next_height_to_preprocess(
1383        &self,
1384        chain_id: ChainId,
1385    ) -> Result<BlockHeight, WorkerError> {
1386        self.query_chain_worker(chain_id, |callback| {
1387            ChainWorkerRequest::GetNextHeightToPreprocess { callback }
1388        })
1389        .await
1390    }
1391}
1392
1393#[cfg(with_testing)]
1394impl<StorageClient> WorkerState<StorageClient>
1395where
1396    StorageClient: Storage,
1397{
1398    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1399    ///
1400    /// # Panics
1401    ///
1402    /// If the validator doesn't have a key pair assigned to it.
1403    #[instrument(level = "trace", skip(self))]
1404    pub fn public_key(&self) -> ValidatorPublicKey {
1405        self.chain_worker_config
1406            .key_pair()
1407            .expect(
1408                "Test validator should have a key pair assigned to it \
1409                in order to obtain it's public key",
1410            )
1411            .public()
1412    }
1413}