linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14    data_types::{
15        ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp,
16    },
17    doc_scalar,
18    hashed::Hashed,
19    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
20    time::Instant,
21    util::traits::DynError,
22};
23#[cfg(with_testing)]
24use linera_chain::ChainExecutionContext;
25use linera_chain::{
26    data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
27    types::{
28        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
29        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
30    },
31    ChainError, ChainStateView,
32};
33use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
34use linera_storage::Storage;
35use linera_views::{context::InactiveContext, ViewError};
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
39use tracing::{instrument, trace, warn};
40
41/// Re-export of [`EventSubscriptionsResult`] for use by other crate modules.
42pub(crate) use crate::chain_worker::{
43    ChainWorkerRequestReceiver, ChainWorkerRequestSender, EventSubscriptionsResult,
44};
45use crate::{
46    chain_worker::{
47        BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
48    },
49    client::ListeningMode,
50    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
51    join_set_ext::{JoinSet, JoinSetExt},
52    notifier::Notifier,
53    value_cache::ValueCache,
54    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
55};
56
57#[cfg(test)]
58#[path = "unit_tests/worker_tests.rs"]
59mod worker_tests;
60
61#[cfg(with_metrics)]
62mod metrics {
63    use std::sync::LazyLock;
64
65    use linera_base::prometheus_util::{
66        exponential_bucket_interval, register_histogram, register_histogram_vec,
67        register_int_counter, register_int_counter_vec, register_int_gauge,
68    };
69    use linera_chain::types::ConfirmedBlockCertificate;
70    use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge};
71
72    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
73        register_histogram_vec(
74            "num_rounds_in_certificate",
75            "Number of rounds in certificate",
76            &["certificate_value", "round_type"],
77            exponential_bucket_interval(0.1, 50.0),
78        )
79    });
80
81    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
82        register_histogram_vec(
83            "num_rounds_in_block_proposal",
84            "Number of rounds in block proposal",
85            &["round_type"],
86            exponential_bucket_interval(0.1, 50.0),
87        )
88    });
89
90    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
91        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
92
93    pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
94        LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
95
96    pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
97        LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
98
99    pub static OPERATION_COUNT: LazyLock<IntCounter> =
100        LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
101
102    pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
103        register_histogram(
104            "operations_per_block",
105            "Number of operations per block",
106            exponential_bucket_interval(1.0, 10000.0),
107        )
108    });
109
110    pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
111        register_histogram(
112            "incoming_bundles_per_block",
113            "Number of incoming bundles per block",
114            exponential_bucket_interval(1.0, 10000.0),
115        )
116    });
117
118    pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
119        register_histogram(
120            "transactions_per_block",
121            "Number of transactions per block",
122            exponential_bucket_interval(1.0, 10000.0),
123        )
124    });
125
126    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
127        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
128    });
129
130    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
131        register_int_counter_vec(
132            "certificates_signed",
133            "Number of confirmed block certificates signed by each validator",
134            &["validator_name"],
135        )
136    });
137
138    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
139        register_int_counter(
140            "chain_info_queries",
141            "Number of chain info queries processed",
142        )
143    });
144
145    /// Number of cached chain worker channel endpoints in the map.
146    pub static CHAIN_WORKER_ENDPOINTS_CACHED: LazyLock<IntGauge> = LazyLock::new(|| {
147        register_int_gauge(
148            "chain_worker_endpoints_cached",
149            "Number of cached chain worker channel endpoints",
150        )
151    });
152
153    /// Holds metrics data extracted from a confirmed block certificate.
154    pub struct MetricsData {
155        certificate_log_str: &'static str,
156        round_type: &'static str,
157        round_number: u32,
158        confirmed_transactions: u64,
159        confirmed_incoming_bundles: u64,
160        confirmed_incoming_messages: u64,
161        confirmed_operations: u64,
162        validators_with_signatures: Vec<String>,
163    }
164
165    impl MetricsData {
166        /// Creates a new `MetricsData` by extracting data from the certificate.
167        pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
168            Self {
169                certificate_log_str: certificate.inner().to_log_str(),
170                round_type: certificate.round.type_name(),
171                round_number: certificate.round.number(),
172                confirmed_transactions: certificate.block().body.transactions.len() as u64,
173                confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
174                    as u64,
175                confirmed_incoming_messages: certificate
176                    .block()
177                    .body
178                    .incoming_bundles()
179                    .map(|b| b.messages().count())
180                    .sum::<usize>() as u64,
181                confirmed_operations: certificate.block().body.operations().count() as u64,
182                validators_with_signatures: certificate
183                    .signatures()
184                    .iter()
185                    .map(|(validator_name, _)| validator_name.to_string())
186                    .collect(),
187            }
188        }
189
190        /// Records the metrics for a processed block.
191        pub fn record(self) {
192            NUM_BLOCKS.with_label_values(&[]).inc();
193            NUM_ROUNDS_IN_CERTIFICATE
194                .with_label_values(&[self.certificate_log_str, self.round_type])
195                .observe(self.round_number as f64);
196            TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
197            INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
198            OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
199            if self.confirmed_transactions > 0 {
200                TRANSACTION_COUNT
201                    .with_label_values(&[])
202                    .inc_by(self.confirmed_transactions);
203                if self.confirmed_incoming_bundles > 0 {
204                    INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
205                }
206                if self.confirmed_incoming_messages > 0 {
207                    INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
208                }
209                if self.confirmed_operations > 0 {
210                    OPERATION_COUNT.inc_by(self.confirmed_operations);
211                }
212            }
213
214            for validator_name in self.validators_with_signatures {
215                CERTIFICATES_SIGNED
216                    .with_label_values(&[&validator_name])
217                    .inc();
218            }
219        }
220    }
221}
222
223/// Instruct the networking layer to send cross-chain requests and/or push notifications.
224#[derive(Default, Debug)]
225pub struct NetworkActions {
226    /// The cross-chain requests
227    pub cross_chain_requests: Vec<CrossChainRequest>,
228    /// The push notifications.
229    pub notifications: Vec<Notification>,
230}
231
232impl NetworkActions {
233    pub fn extend(&mut self, other: NetworkActions) {
234        self.cross_chain_requests.extend(other.cross_chain_requests);
235        self.notifications.extend(other.notifications);
236    }
237}
238
239#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
240/// Notification that a chain has a new certified block or a new message.
241pub struct Notification {
242    pub chain_id: ChainId,
243    pub reason: Reason,
244}
245
246doc_scalar!(
247    Notification,
248    "Notify that a chain has a new certified block or a new message"
249);
250
251#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
252/// Reason for the notification.
253pub enum Reason {
254    NewBlock {
255        height: BlockHeight,
256        hash: CryptoHash,
257    },
258    NewEvents {
259        height: BlockHeight,
260        hash: CryptoHash,
261        event_streams: BTreeSet<StreamId>,
262    },
263    NewIncomingBundle {
264        origin: ChainId,
265        height: BlockHeight,
266    },
267    NewRound {
268        height: BlockHeight,
269        round: Round,
270    },
271    BlockExecuted {
272        height: BlockHeight,
273        hash: CryptoHash,
274    },
275}
276
277/// Error type for worker operations.
278#[derive(Debug, Error)]
279pub enum WorkerError {
280    #[error(transparent)]
281    CryptoError(#[from] CryptoError),
282
283    #[error(transparent)]
284    ArithmeticError(#[from] ArithmeticError),
285
286    #[error(transparent)]
287    ViewError(#[from] ViewError),
288
289    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
290    ReadCertificatesError(Vec<CryptoHash>),
291
292    #[error(transparent)]
293    ChainError(#[from] Box<ChainError>),
294
295    #[error(transparent)]
296    BcsError(#[from] bcs::Error),
297
298    // Chain access control
299    #[error("Block was not signed by an authorized owner")]
300    InvalidOwner,
301
302    #[error("Operations in the block are not authenticated by the proper owner: {0}")]
303    InvalidSigner(AccountOwner),
304
305    // Chaining
306    #[error(
307        "Chain is expecting a next block at height {expected_block_height} but the given block \
308        is at height {found_block_height} instead"
309    )]
310    UnexpectedBlockHeight {
311        expected_block_height: BlockHeight,
312        found_block_height: BlockHeight,
313    },
314    #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
315    InvalidEpoch {
316        chain_id: ChainId,
317        chain_epoch: Epoch,
318        epoch: Epoch,
319    },
320
321    #[error("Events not found: {0:?}")]
322    EventsNotFound(Vec<EventId>),
323
324    // Other server-side errors
325    #[error("Invalid cross-chain request")]
326    InvalidCrossChainRequest,
327    #[error("The block does not contain the hash that we expected for the previous block")]
328    InvalidBlockChaining,
329    #[error(
330        "The given outcome is not what we computed after executing the block.\n\
331        Computed: {computed:#?}\n\
332        Submitted: {submitted:#?}"
333    )]
334    IncorrectOutcome {
335        computed: Box<BlockExecutionOutcome>,
336        submitted: Box<BlockExecutionOutcome>,
337    },
338    #[error(
339        "Block timestamp ({block_timestamp}) is further in the future from local time \
340        ({local_time}) than block time grace period ({block_time_grace_period:?})"
341    )]
342    InvalidTimestamp {
343        block_timestamp: Timestamp,
344        local_time: Timestamp,
345        block_time_grace_period: Duration,
346    },
347    #[error("We don't have the value for the certificate.")]
348    MissingCertificateValue,
349    #[error("The hash certificate doesn't match its value.")]
350    InvalidLiteCertificate,
351    #[error("Fast blocks cannot query oracles")]
352    FastBlockUsingOracles,
353    #[error("Blobs not found: {0:?}")]
354    BlobsNotFound(Vec<BlobId>),
355    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
356    ConfirmedLogEntryNotFound {
357        height: BlockHeight,
358        chain_id: ChainId,
359    },
360    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
361    PreprocessedBlocksEntryNotFound {
362        height: BlockHeight,
363        chain_id: ChainId,
364    },
365    #[error("The block proposal is invalid: {0}")]
366    InvalidBlockProposal(String),
367    #[error("Blob was not required by any pending block")]
368    UnexpectedBlob,
369    #[error("Number of published blobs per block must not exceed {0}")]
370    TooManyPublishedBlobs(u64),
371    #[error("Missing network description")]
372    MissingNetworkDescription,
373    #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
374    ChainActorSendError {
375        chain_id: ChainId,
376        error: Box<dyn DynError>,
377    },
378    #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
379    ChainActorRecvError {
380        chain_id: ChainId,
381        error: Box<dyn DynError>,
382    },
383
384    #[error("thread error: {0}")]
385    Thread(#[from] web_thread_pool::Error),
386}
387
388impl WorkerError {
389    /// Returns whether this error is caused by an issue in the local node.
390    ///
391    /// Returns `false` whenever the error could be caused by a bad message from a peer.
392    pub fn is_local(&self) -> bool {
393        match self {
394            WorkerError::CryptoError(_)
395            | WorkerError::ArithmeticError(_)
396            | WorkerError::InvalidOwner
397            | WorkerError::InvalidSigner(_)
398            | WorkerError::UnexpectedBlockHeight { .. }
399            | WorkerError::InvalidEpoch { .. }
400            | WorkerError::EventsNotFound(_)
401            | WorkerError::InvalidBlockChaining
402            | WorkerError::IncorrectOutcome { .. }
403            | WorkerError::InvalidTimestamp { .. }
404            | WorkerError::MissingCertificateValue
405            | WorkerError::InvalidLiteCertificate
406            | WorkerError::FastBlockUsingOracles
407            | WorkerError::BlobsNotFound(_)
408            | WorkerError::InvalidBlockProposal(_)
409            | WorkerError::UnexpectedBlob
410            | WorkerError::TooManyPublishedBlobs(_)
411            | WorkerError::ViewError(ViewError::NotFound(_)) => false,
412            WorkerError::BcsError(_)
413            | WorkerError::InvalidCrossChainRequest
414            | WorkerError::ViewError(_)
415            | WorkerError::ConfirmedLogEntryNotFound { .. }
416            | WorkerError::PreprocessedBlocksEntryNotFound { .. }
417            | WorkerError::MissingNetworkDescription
418            | WorkerError::ChainActorSendError { .. }
419            | WorkerError::ChainActorRecvError { .. }
420            | WorkerError::Thread(_)
421            | WorkerError::ReadCertificatesError(_) => true,
422            WorkerError::ChainError(chain_error) => chain_error.is_local(),
423        }
424    }
425}
426
427impl From<ChainError> for WorkerError {
428    #[instrument(level = "trace", skip(chain_error))]
429    fn from(chain_error: ChainError) -> Self {
430        match chain_error {
431            ChainError::ExecutionError(execution_error, context) => match *execution_error {
432                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
433                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
434                _ => Self::ChainError(Box::new(ChainError::ExecutionError(
435                    execution_error,
436                    context,
437                ))),
438            },
439            error => Self::ChainError(Box::new(error)),
440        }
441    }
442}
443
444#[cfg(with_testing)]
445impl WorkerError {
446    /// Returns the inner [`ExecutionError`] in this error.
447    ///
448    /// # Panics
449    ///
450    /// If this is not caused by an [`ExecutionError`].
451    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
452        let WorkerError::ChainError(chain_error) = self else {
453            panic!("Expected an `ExecutionError`. Got: {self:#?}");
454        };
455
456        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
457            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
458        };
459
460        assert_eq!(context, expected_context);
461
462        *execution_error
463    }
464}
465
466/// State of a worker in a validator or a local node.
467pub struct WorkerState<StorageClient>
468where
469    StorageClient: Storage,
470{
471    /// A name used for logging
472    nickname: String,
473    /// Access to local persistent storage.
474    storage: StorageClient,
475    /// Configuration options for the [`ChainWorker`]s.
476    chain_worker_config: ChainWorkerConfig,
477    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
478    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
479    /// Chains tracked by a worker, along with their listening modes.
480    chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
481    /// One-shot channels to notify callers when messages of a particular chain have been
482    /// delivered.
483    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
484    /// The set of spawned [`ChainWorkerActor`] tasks.
485    chain_worker_tasks: Arc<Mutex<JoinSet>>,
486    /// The cache of running [`ChainWorkerActor`]s.
487    chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
488}
489
490impl<StorageClient> Clone for WorkerState<StorageClient>
491where
492    StorageClient: Storage + Clone,
493{
494    fn clone(&self) -> Self {
495        WorkerState {
496            nickname: self.nickname.clone(),
497            storage: self.storage.clone(),
498            chain_worker_config: self.chain_worker_config.clone(),
499            block_cache: self.block_cache.clone(),
500            execution_state_cache: self.execution_state_cache.clone(),
501            chain_modes: self.chain_modes.clone(),
502            delivery_notifiers: self.delivery_notifiers.clone(),
503            chain_worker_tasks: self.chain_worker_tasks.clone(),
504            chain_workers: self.chain_workers.clone(),
505        }
506    }
507}
508
509/// The sender endpoint for [`ChainWorkerRequest`]s.
510type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
511    ChainWorkerRequest<<StorageClient as Storage>::Context>,
512    tracing::Span,
513    Instant,
514)>;
515
516pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
517
518impl<StorageClient> WorkerState<StorageClient>
519where
520    StorageClient: Storage,
521{
522    #[instrument(level = "trace", skip(nickname, key_pair, storage))]
523    pub fn new(
524        nickname: String,
525        key_pair: Option<ValidatorSecretKey>,
526        storage: StorageClient,
527        block_cache_size: usize,
528        execution_state_cache_size: usize,
529    ) -> Self {
530        WorkerState {
531            nickname,
532            storage,
533            chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
534            block_cache: Arc::new(ValueCache::new(block_cache_size)),
535            execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
536            chain_modes: None,
537            delivery_notifiers: Arc::default(),
538            chain_worker_tasks: Arc::default(),
539            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
540        }
541    }
542
543    #[instrument(level = "trace", skip(nickname, storage))]
544    pub fn new_for_client(
545        nickname: String,
546        storage: StorageClient,
547        chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
548        block_cache_size: usize,
549        execution_state_cache_size: usize,
550    ) -> Self {
551        WorkerState {
552            nickname,
553            storage,
554            chain_worker_config: ChainWorkerConfig::default(),
555            block_cache: Arc::new(ValueCache::new(block_cache_size)),
556            execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
557            chain_modes: Some(chain_modes),
558            delivery_notifiers: Arc::default(),
559            chain_worker_tasks: Arc::default(),
560            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
561        }
562    }
563
564    #[instrument(level = "trace", skip(self, value))]
565    pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
566        self.chain_worker_config.allow_inactive_chains = value;
567        self
568    }
569
570    #[instrument(level = "trace", skip(self, value))]
571    pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
572        self.chain_worker_config
573            .allow_messages_from_deprecated_epochs = value;
574        self
575    }
576
577    #[instrument(level = "trace", skip(self, value))]
578    pub fn with_long_lived_services(mut self, value: bool) -> Self {
579        self.chain_worker_config.long_lived_services = value;
580        self
581    }
582
583    /// Returns an instance with the specified block time grace period.
584    ///
585    /// Blocks with a timestamp this far in the future will still be accepted, but the validator
586    /// will wait until that timestamp before voting.
587    #[instrument(level = "trace", skip(self))]
588    pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self {
589        self.chain_worker_config.block_time_grace_period = block_time_grace_period;
590        self
591    }
592
593    /// Returns an instance with the specified chain worker TTL.
594    ///
595    /// Idle chain workers free their memory after that duration without requests.
596    #[instrument(level = "trace", skip(self))]
597    pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
598        self.chain_worker_config.ttl = chain_worker_ttl;
599        self
600    }
601
602    /// Returns an instance with the specified sender chain worker TTL.
603    ///
604    /// Idle sender chain workers free their memory after that duration without requests.
605    #[instrument(level = "trace", skip(self))]
606    pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
607        self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
608        self
609    }
610
611    /// Returns an instance with the specified maximum size for received_log entries.
612    ///
613    /// Sizes below `CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES` should be avoided.
614    #[instrument(level = "trace", skip(self))]
615    pub fn with_chain_info_max_received_log_entries(
616        mut self,
617        chain_info_max_received_log_entries: usize,
618    ) -> Self {
619        if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
620            warn!(
621                "The value set for the maximum size of received_log entries \
622                   may not be compatible with the latest clients: {} instead of {}",
623                chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
624            );
625        }
626        self.chain_worker_config.chain_info_max_received_log_entries =
627            chain_info_max_received_log_entries;
628        self
629    }
630
631    #[instrument(level = "trace", skip(self))]
632    pub fn nickname(&self) -> &str {
633        &self.nickname
634    }
635
636    /// Returns the storage client so that it can be manipulated or queried.
637    #[instrument(level = "trace", skip(self))]
638    #[cfg(not(feature = "test"))]
639    pub(crate) fn storage_client(&self) -> &StorageClient {
640        &self.storage
641    }
642
643    /// Returns the storage client so that it can be manipulated or queried by tests in other
644    /// crates.
645    #[instrument(level = "trace", skip(self))]
646    #[cfg(feature = "test")]
647    pub fn storage_client(&self) -> &StorageClient {
648        &self.storage
649    }
650
651    #[instrument(level = "trace", skip(self, certificate))]
652    pub(crate) async fn full_certificate(
653        &self,
654        certificate: LiteCertificate<'_>,
655    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
656        let block = self
657            .block_cache
658            .get(&certificate.value.value_hash)
659            .ok_or(WorkerError::MissingCertificateValue)?;
660
661        match certificate.value.kind {
662            linera_chain::types::CertificateKind::Confirmed => {
663                let value = ConfirmedBlock::from_hashed(block);
664                Ok(Either::Left(
665                    certificate
666                        .with_value(value)
667                        .ok_or(WorkerError::InvalidLiteCertificate)?,
668                ))
669            }
670            linera_chain::types::CertificateKind::Validated => {
671                let value = ValidatedBlock::from_hashed(block);
672                Ok(Either::Right(
673                    certificate
674                        .with_value(value)
675                        .ok_or(WorkerError::InvalidLiteCertificate)?,
676                ))
677            }
678            _ => Err(WorkerError::InvalidLiteCertificate),
679        }
680    }
681}
682
683#[allow(async_fn_in_trait)]
684#[cfg_attr(not(web), trait_variant::make(Send))]
685pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
686    async fn process_certificate<S: Storage + Clone + 'static>(
687        worker: &WorkerState<S>,
688        certificate: GenericCertificate<Self>,
689    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
690}
691
692impl ProcessableCertificate for ConfirmedBlock {
693    async fn process_certificate<S: Storage + Clone + 'static>(
694        worker: &WorkerState<S>,
695        certificate: ConfirmedBlockCertificate,
696    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
697        Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
698    }
699}
700
701impl ProcessableCertificate for ValidatedBlock {
702    async fn process_certificate<S: Storage + Clone + 'static>(
703        worker: &WorkerState<S>,
704        certificate: ValidatedBlockCertificate,
705    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
706        Box::pin(worker.handle_validated_certificate(certificate)).await
707    }
708}
709
710impl ProcessableCertificate for Timeout {
711    async fn process_certificate<S: Storage + Clone + 'static>(
712        worker: &WorkerState<S>,
713        certificate: TimeoutCertificate,
714    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
715        worker.handle_timeout_certificate(certificate).await
716    }
717}
718
719impl<StorageClient> WorkerState<StorageClient>
720where
721    StorageClient: Storage + Clone + 'static,
722{
723    #[instrument(level = "trace", skip(self, certificate, notifier))]
724    #[inline]
725    pub async fn fully_handle_certificate_with_notifications<T>(
726        &self,
727        certificate: GenericCertificate<T>,
728        notifier: &impl Notifier,
729    ) -> Result<ChainInfoResponse, WorkerError>
730    where
731        T: ProcessableCertificate,
732    {
733        let notifications = (*notifier).clone();
734        let this = self.clone();
735        linera_base::task::spawn(async move {
736            let (response, actions) =
737                ProcessableCertificate::process_certificate(&this, certificate).await?;
738            notifications.notify(&actions.notifications);
739            let mut requests = VecDeque::from(actions.cross_chain_requests);
740            while let Some(request) = requests.pop_front() {
741                let actions = this.handle_cross_chain_request(request).await?;
742                requests.extend(actions.cross_chain_requests);
743                notifications.notify(&actions.notifications);
744            }
745            Ok(response)
746        })
747        .await
748    }
749
750    /// Tries to execute a block proposal without any verification other than block execution.
751    #[instrument(level = "trace", skip(self, block))]
752    pub async fn stage_block_execution(
753        &self,
754        block: ProposedBlock,
755        round: Option<u32>,
756        published_blobs: Vec<Blob>,
757    ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> {
758        self.query_chain_worker(block.chain_id, move |callback| {
759            ChainWorkerRequest::StageBlockExecution {
760                block,
761                round,
762                published_blobs,
763                callback,
764            }
765        })
766        .await
767    }
768
769    /// Executes a [`Query`] for an application's state on a specific chain.
770    ///
771    /// If `block_hash` is specified, system will query the application's state
772    /// at that block. If it doesn't exist, it uses latest state.
773    #[instrument(level = "trace", skip(self, chain_id, query))]
774    pub async fn query_application(
775        &self,
776        chain_id: ChainId,
777        query: Query,
778        block_hash: Option<CryptoHash>,
779    ) -> Result<QueryOutcome, WorkerError> {
780        self.query_chain_worker(chain_id, move |callback| {
781            ChainWorkerRequest::QueryApplication {
782                query,
783                block_hash,
784                callback,
785            }
786        })
787        .await
788    }
789
790    #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
791        nickname = %self.nickname,
792        chain_id = %chain_id,
793        application_id = %application_id
794    ))]
795    pub async fn describe_application(
796        &self,
797        chain_id: ChainId,
798        application_id: ApplicationId,
799    ) -> Result<ApplicationDescription, WorkerError> {
800        self.query_chain_worker(chain_id, move |callback| {
801            ChainWorkerRequest::DescribeApplication {
802                application_id,
803                callback,
804            }
805        })
806        .await
807    }
808
809    /// Processes a confirmed block (aka a commit).
810    #[instrument(
811        level = "trace",
812        skip(self, certificate, notify_when_messages_are_delivered),
813        fields(
814            nickname = %self.nickname,
815            chain_id = %certificate.block().header.chain_id,
816            block_height = %certificate.block().header.height
817        )
818    )]
819    async fn process_confirmed_block(
820        &self,
821        certificate: ConfirmedBlockCertificate,
822        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
823    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
824        let chain_id = certificate.block().header.chain_id;
825        self.query_chain_worker(chain_id, move |callback| {
826            ChainWorkerRequest::ProcessConfirmedBlock {
827                certificate,
828                notify_when_messages_are_delivered,
829                callback,
830            }
831        })
832        .await
833    }
834
835    /// Processes a validated block issued from a multi-owner chain.
836    #[instrument(level = "trace", skip(self, certificate), fields(
837        nickname = %self.nickname,
838        chain_id = %certificate.block().header.chain_id,
839        block_height = %certificate.block().header.height
840    ))]
841    async fn process_validated_block(
842        &self,
843        certificate: ValidatedBlockCertificate,
844    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
845        let chain_id = certificate.block().header.chain_id;
846        self.query_chain_worker(chain_id, move |callback| {
847            ChainWorkerRequest::ProcessValidatedBlock {
848                certificate,
849                callback,
850            }
851        })
852        .await
853    }
854
855    /// Processes a leader timeout issued from a multi-owner chain.
856    #[instrument(level = "trace", skip(self, certificate), fields(
857        nickname = %self.nickname,
858        chain_id = %certificate.value().chain_id(),
859        height = %certificate.value().height()
860    ))]
861    async fn process_timeout(
862        &self,
863        certificate: TimeoutCertificate,
864    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
865        let chain_id = certificate.value().chain_id();
866        self.query_chain_worker(chain_id, move |callback| {
867            ChainWorkerRequest::ProcessTimeout {
868                certificate,
869                callback,
870            }
871        })
872        .await
873    }
874
875    #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
876        nickname = %self.nickname,
877        origin = %origin,
878        recipient = %recipient,
879        num_bundles = %bundles.len()
880    ))]
881    async fn process_cross_chain_update(
882        &self,
883        origin: ChainId,
884        recipient: ChainId,
885        bundles: Vec<(Epoch, MessageBundle)>,
886    ) -> Result<Option<BlockHeight>, WorkerError> {
887        self.query_chain_worker(recipient, move |callback| {
888            ChainWorkerRequest::ProcessCrossChainUpdate {
889                origin,
890                bundles,
891                callback,
892            }
893        })
894        .await
895    }
896
897    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
898    #[instrument(level = "trace", skip(self, chain_id, height), fields(
899        nickname = %self.nickname,
900        chain_id = %chain_id,
901        height = %height
902    ))]
903    #[cfg(with_testing)]
904    pub async fn read_certificate(
905        &self,
906        chain_id: ChainId,
907        height: BlockHeight,
908    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
909        self.query_chain_worker(chain_id, move |callback| {
910            ChainWorkerRequest::ReadCertificate { height, callback }
911        })
912        .await
913    }
914
915    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
916    /// [`ChainId`].
917    ///
918    /// The returned view holds a lock on the chain state, which prevents the worker from changing
919    /// the state of that chain.
920    #[instrument(level = "trace", skip(self), fields(
921        nickname = %self.nickname,
922        chain_id = %chain_id
923    ))]
924    pub async fn chain_state_view(
925        &self,
926        chain_id: ChainId,
927    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
928        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
929            callback,
930        })
931        .await
932    }
933
934    /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
935    #[instrument(level = "trace", skip(self, request_builder), fields(
936        nickname = %self.nickname,
937        chain_id = %chain_id
938    ))]
939    async fn query_chain_worker<Response>(
940        &self,
941        chain_id: ChainId,
942        request_builder: impl FnOnce(
943            oneshot::Sender<Result<Response, WorkerError>>,
944        ) -> ChainWorkerRequest<StorageClient::Context>,
945    ) -> Result<Response, WorkerError> {
946        // Build the request.
947        let (callback, response) = oneshot::channel();
948        let request = request_builder(callback);
949
950        // Call the endpoint, possibly a new one.
951        let new_channel = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
952
953        // We just created a channel: spawn the actor.
954        if let Some((sender, receiver)) = new_channel {
955            let delivery_notifier = self
956                .delivery_notifiers
957                .lock()
958                .unwrap()
959                .entry(chain_id)
960                .or_default()
961                .clone();
962
963            let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
964                chain_modes
965                    .read()
966                    .unwrap()
967                    .get(&chain_id)
968                    .is_some_and(ListeningMode::is_full)
969            });
970
971            let actor_task = ChainWorkerActor::run(
972                self.chain_worker_config.clone(),
973                self.storage.clone(),
974                self.block_cache.clone(),
975                self.execution_state_cache.clone(),
976                self.chain_modes.clone(),
977                delivery_notifier,
978                chain_id,
979                sender,
980                receiver,
981                is_tracked,
982            );
983
984            self.chain_worker_tasks
985                .lock()
986                .unwrap()
987                .spawn_task(actor_task);
988        }
989
990        // Finally, wait a response.
991        match response.await {
992            Err(e) => {
993                // The actor endpoint was dropped. Better luck next time!
994                Err(WorkerError::ChainActorRecvError {
995                    chain_id,
996                    error: Box::new(e),
997                })
998            }
999            Ok(response) => response,
1000        }
1001    }
1002
1003    /// Find an endpoint and call it. Create the endpoint if necessary.
1004    ///
1005    /// Returns `Some((sender, receiver))` if a new channel was created and the actor needs to be
1006    /// spawned, or `None` if an existing endpoint was used.
1007    #[instrument(level = "trace", skip(self), fields(
1008        nickname = %self.nickname,
1009        chain_id = %chain_id
1010    ))]
1011    #[expect(clippy::type_complexity)]
1012    fn call_and_maybe_create_chain_worker_endpoint(
1013        &self,
1014        chain_id: ChainId,
1015        request: ChainWorkerRequest<StorageClient::Context>,
1016    ) -> Result<
1017        Option<(
1018            ChainWorkerRequestSender<StorageClient::Context>,
1019            ChainWorkerRequestReceiver<StorageClient::Context>,
1020        )>,
1021        WorkerError,
1022    > {
1023        let mut chain_workers = self.chain_workers.lock().unwrap();
1024
1025        let (sender, new_channel) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
1026            (endpoint, None)
1027        } else {
1028            let (sender, receiver) = mpsc::unbounded_channel();
1029            (sender.clone(), Some((sender, receiver)))
1030        };
1031
1032        if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) {
1033            // The actor was dropped. Give up without (re-)inserting the endpoint in the cache.
1034            return Err(WorkerError::ChainActorSendError {
1035                chain_id,
1036                error: Box::new(e),
1037            });
1038        }
1039
1040        // Put back the sender in the cache for next time.
1041        chain_workers.insert(chain_id, sender);
1042        #[cfg(with_metrics)]
1043        metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64);
1044
1045        Ok(new_channel)
1046    }
1047
1048    #[instrument(skip_all, fields(
1049        nick = self.nickname,
1050        chain_id = format!("{:.8}", proposal.content.block.chain_id),
1051        height = %proposal.content.block.height,
1052    ))]
1053    pub async fn handle_block_proposal(
1054        &self,
1055        proposal: BlockProposal,
1056    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1057        trace!("{} <-- {:?}", self.nickname, proposal);
1058        #[cfg(with_metrics)]
1059        let round = proposal.content.round;
1060        let response = self
1061            .query_chain_worker(proposal.content.block.chain_id, move |callback| {
1062                ChainWorkerRequest::HandleBlockProposal { proposal, callback }
1063            })
1064            .await?;
1065        #[cfg(with_metrics)]
1066        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1067            .with_label_values(&[round.type_name()])
1068            .observe(round.number() as f64);
1069        Ok(response)
1070    }
1071
1072    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
1073    // Other fields will be included in handle_certificate's span.
1074    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
1075    pub async fn handle_lite_certificate(
1076        &self,
1077        certificate: LiteCertificate<'_>,
1078        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1079    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1080        match self.full_certificate(certificate).await? {
1081            Either::Left(confirmed) => {
1082                Box::pin(
1083                    self.handle_confirmed_certificate(
1084                        confirmed,
1085                        notify_when_messages_are_delivered,
1086                    ),
1087                )
1088                .await
1089            }
1090            Either::Right(validated) => {
1091                if let Some(notifier) = notify_when_messages_are_delivered {
1092                    // Nothing to wait for.
1093                    if let Err(()) = notifier.send(()) {
1094                        warn!("Failed to notify message delivery to caller");
1095                    }
1096                }
1097                Box::pin(self.handle_validated_certificate(validated)).await
1098            }
1099        }
1100    }
1101
1102    /// Processes a confirmed block certificate.
1103    #[instrument(skip_all, fields(
1104        nick = self.nickname,
1105        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1106        height = %certificate.block().header.height,
1107    ))]
1108    pub async fn handle_confirmed_certificate(
1109        &self,
1110        certificate: ConfirmedBlockCertificate,
1111        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1112    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1113        trace!("{} <-- {:?}", self.nickname, certificate);
1114        #[cfg(with_metrics)]
1115        let metrics_data = metrics::MetricsData::new(&certificate);
1116
1117        let (info, actions, _outcome) =
1118            Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1119                .await?;
1120
1121        #[cfg(with_metrics)]
1122        if matches!(_outcome, BlockOutcome::Processed) {
1123            metrics_data.record();
1124        }
1125        Ok((info, actions))
1126    }
1127
1128    /// Processes a validated block certificate.
1129    #[instrument(skip_all, fields(
1130        nick = self.nickname,
1131        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1132        height = %certificate.block().header.height,
1133    ))]
1134    pub async fn handle_validated_certificate(
1135        &self,
1136        certificate: ValidatedBlockCertificate,
1137    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1138        trace!("{} <-- {:?}", self.nickname, certificate);
1139
1140        #[cfg(with_metrics)]
1141        let round = certificate.round;
1142        #[cfg(with_metrics)]
1143        let cert_str = certificate.inner().to_log_str();
1144
1145        let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1146        #[cfg(with_metrics)]
1147        {
1148            if matches!(_outcome, BlockOutcome::Processed) {
1149                metrics::NUM_ROUNDS_IN_CERTIFICATE
1150                    .with_label_values(&[cert_str, round.type_name()])
1151                    .observe(round.number() as f64);
1152            }
1153        }
1154        Ok((info, actions))
1155    }
1156
1157    /// Processes a timeout certificate
1158    #[instrument(skip_all, fields(
1159        nick = self.nickname,
1160        chain_id = format!("{:.8}", certificate.inner().chain_id()),
1161        height = %certificate.inner().height(),
1162    ))]
1163    pub async fn handle_timeout_certificate(
1164        &self,
1165        certificate: TimeoutCertificate,
1166    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1167        trace!("{} <-- {:?}", self.nickname, certificate);
1168        self.process_timeout(certificate).await
1169    }
1170
1171    #[instrument(skip_all, fields(
1172        nick = self.nickname,
1173        chain_id = format!("{:.8}", query.chain_id)
1174    ))]
1175    pub async fn handle_chain_info_query(
1176        &self,
1177        query: ChainInfoQuery,
1178    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1179        trace!("{} <-- {:?}", self.nickname, query);
1180        #[cfg(with_metrics)]
1181        metrics::CHAIN_INFO_QUERIES.inc();
1182        let result = self
1183            .query_chain_worker(query.chain_id, move |callback| {
1184                ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1185            })
1186            .await;
1187        trace!("{} --> {:?}", self.nickname, result);
1188        result
1189    }
1190
1191    #[instrument(skip_all, fields(
1192        nick = self.nickname,
1193        chain_id = format!("{:.8}", chain_id)
1194    ))]
1195    pub async fn download_pending_blob(
1196        &self,
1197        chain_id: ChainId,
1198        blob_id: BlobId,
1199    ) -> Result<Blob, WorkerError> {
1200        trace!(
1201            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1202            self.nickname
1203        );
1204        let result = self
1205            .query_chain_worker(chain_id, move |callback| {
1206                ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1207            })
1208            .await;
1209        trace!(
1210            "{} --> {:?}",
1211            self.nickname,
1212            result.as_ref().map(|_| blob_id)
1213        );
1214        result
1215    }
1216
1217    #[instrument(skip_all, fields(
1218        nick = self.nickname,
1219        chain_id = format!("{:.8}", chain_id)
1220    ))]
1221    pub async fn handle_pending_blob(
1222        &self,
1223        chain_id: ChainId,
1224        blob: Blob,
1225    ) -> Result<ChainInfoResponse, WorkerError> {
1226        let blob_id = blob.id();
1227        trace!(
1228            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1229            self.nickname
1230        );
1231        let result = self
1232            .query_chain_worker(chain_id, move |callback| {
1233                ChainWorkerRequest::HandlePendingBlob { blob, callback }
1234            })
1235            .await;
1236        trace!(
1237            "{} --> {:?}",
1238            self.nickname,
1239            result.as_ref().map(|_| blob_id)
1240        );
1241        result
1242    }
1243
1244    #[instrument(skip_all, fields(
1245        nick = self.nickname,
1246        chain_id = format!("{:.8}", request.target_chain_id())
1247    ))]
1248    pub async fn handle_cross_chain_request(
1249        &self,
1250        request: CrossChainRequest,
1251    ) -> Result<NetworkActions, WorkerError> {
1252        trace!("{} <-- {:?}", self.nickname, request);
1253        match request {
1254            CrossChainRequest::UpdateRecipient {
1255                sender,
1256                recipient,
1257                bundles,
1258            } => {
1259                let mut actions = NetworkActions::default();
1260                let origin = sender;
1261                let Some(height) = self
1262                    .process_cross_chain_update(origin, recipient, bundles)
1263                    .await?
1264                else {
1265                    return Ok(actions);
1266                };
1267                actions.notifications.push(Notification {
1268                    chain_id: recipient,
1269                    reason: Reason::NewIncomingBundle { origin, height },
1270                });
1271                actions
1272                    .cross_chain_requests
1273                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1274                        sender,
1275                        recipient,
1276                        latest_height: height,
1277                    });
1278                Ok(actions)
1279            }
1280            CrossChainRequest::ConfirmUpdatedRecipient {
1281                sender,
1282                recipient,
1283                latest_height,
1284            } => {
1285                self.query_chain_worker(sender, move |callback| {
1286                    ChainWorkerRequest::ConfirmUpdatedRecipient {
1287                        recipient,
1288                        latest_height,
1289                        callback,
1290                    }
1291                })
1292                .await?;
1293                Ok(NetworkActions::default())
1294            }
1295        }
1296    }
1297
1298    /// Updates the received certificate trackers to at least the given values.
1299    #[instrument(skip_all, fields(
1300        nickname = %self.nickname,
1301        chain_id = %chain_id,
1302        num_trackers = %new_trackers.len()
1303    ))]
1304    pub async fn update_received_certificate_trackers(
1305        &self,
1306        chain_id: ChainId,
1307        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1308    ) -> Result<(), WorkerError> {
1309        self.query_chain_worker(chain_id, move |callback| {
1310            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1311                new_trackers,
1312                callback,
1313            }
1314        })
1315        .await
1316    }
1317
1318    /// Gets preprocessed block hashes in a given height range.
1319    #[instrument(skip_all, fields(
1320        nickname = %self.nickname,
1321        chain_id = %chain_id,
1322        start = %start,
1323        end = %end
1324    ))]
1325    pub async fn get_preprocessed_block_hashes(
1326        &self,
1327        chain_id: ChainId,
1328        start: BlockHeight,
1329        end: BlockHeight,
1330    ) -> Result<Vec<CryptoHash>, WorkerError> {
1331        self.query_chain_worker(chain_id, move |callback| {
1332            ChainWorkerRequest::GetPreprocessedBlockHashes {
1333                start,
1334                end,
1335                callback,
1336            }
1337        })
1338        .await
1339    }
1340
1341    /// Gets the next block height to receive from an inbox.
1342    #[instrument(skip_all, fields(
1343        nickname = %self.nickname,
1344        chain_id = %chain_id,
1345        origin = %origin
1346    ))]
1347    pub async fn get_inbox_next_height(
1348        &self,
1349        chain_id: ChainId,
1350        origin: ChainId,
1351    ) -> Result<BlockHeight, WorkerError> {
1352        self.query_chain_worker(chain_id, move |callback| {
1353            ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1354        })
1355        .await
1356    }
1357
1358    /// Gets locking blobs for specific blob IDs.
1359    /// Returns `Ok(None)` if any of the blobs is not found.
1360    #[instrument(skip_all, fields(
1361        nickname = %self.nickname,
1362        chain_id = %chain_id,
1363        num_blob_ids = %blob_ids.len()
1364    ))]
1365    pub async fn get_locking_blobs(
1366        &self,
1367        chain_id: ChainId,
1368        blob_ids: Vec<BlobId>,
1369    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1370        self.query_chain_worker(chain_id, move |callback| {
1371            ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1372        })
1373        .await
1374    }
1375
1376    /// Gets block hashes for the given heights.
1377    pub async fn get_block_hashes(
1378        &self,
1379        chain_id: ChainId,
1380        heights: Vec<BlockHeight>,
1381    ) -> Result<Vec<CryptoHash>, WorkerError> {
1382        self.query_chain_worker(chain_id, move |callback| {
1383            ChainWorkerRequest::GetBlockHashes { heights, callback }
1384        })
1385        .await
1386    }
1387
1388    /// Gets proposed blobs from the manager for specified blob IDs.
1389    pub async fn get_proposed_blobs(
1390        &self,
1391        chain_id: ChainId,
1392        blob_ids: Vec<BlobId>,
1393    ) -> Result<Vec<Blob>, WorkerError> {
1394        self.query_chain_worker(chain_id, move |callback| {
1395            ChainWorkerRequest::GetProposedBlobs { blob_ids, callback }
1396        })
1397        .await
1398    }
1399
1400    /// Gets event subscriptions from the chain.
1401    pub async fn get_event_subscriptions(
1402        &self,
1403        chain_id: ChainId,
1404    ) -> Result<EventSubscriptionsResult, WorkerError> {
1405        self.query_chain_worker(chain_id, |callback| {
1406            ChainWorkerRequest::GetEventSubscriptions { callback }
1407        })
1408        .await
1409    }
1410
1411    /// Gets the next expected event index for a stream.
1412    pub async fn get_next_expected_event(
1413        &self,
1414        chain_id: ChainId,
1415        stream_id: StreamId,
1416    ) -> Result<Option<u32>, WorkerError> {
1417        self.query_chain_worker(chain_id, move |callback| {
1418            ChainWorkerRequest::GetNextExpectedEvent {
1419                stream_id,
1420                callback,
1421            }
1422        })
1423        .await
1424    }
1425
1426    /// Gets received certificate trackers.
1427    pub async fn get_received_certificate_trackers(
1428        &self,
1429        chain_id: ChainId,
1430    ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1431        self.query_chain_worker(chain_id, |callback| {
1432            ChainWorkerRequest::GetReceivedCertificateTrackers { callback }
1433        })
1434        .await
1435    }
1436
1437    /// Gets tip state and outbox info for next_outbox_heights calculation.
1438    pub async fn get_tip_state_and_outbox_info(
1439        &self,
1440        chain_id: ChainId,
1441        receiver_id: ChainId,
1442    ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1443        self.query_chain_worker(chain_id, move |callback| {
1444            ChainWorkerRequest::GetTipStateAndOutboxInfo {
1445                receiver_id,
1446                callback,
1447            }
1448        })
1449        .await
1450    }
1451
1452    /// Gets the next height to preprocess.
1453    pub async fn get_next_height_to_preprocess(
1454        &self,
1455        chain_id: ChainId,
1456    ) -> Result<BlockHeight, WorkerError> {
1457        self.query_chain_worker(chain_id, |callback| {
1458            ChainWorkerRequest::GetNextHeightToPreprocess { callback }
1459        })
1460        .await
1461    }
1462}
1463
1464#[cfg(with_testing)]
1465impl<StorageClient> WorkerState<StorageClient>
1466where
1467    StorageClient: Storage,
1468{
1469    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1470    ///
1471    /// # Panics
1472    ///
1473    /// If the validator doesn't have a key pair assigned to it.
1474    #[instrument(level = "trace", skip(self))]
1475    pub fn public_key(&self) -> ValidatorPublicKey {
1476        self.chain_worker_config
1477            .key_pair()
1478            .expect(
1479                "Test validator should have a key pair assigned to it \
1480                in order to obtain it's public key",
1481            )
1482            .public()
1483    }
1484}