linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14    data_types::{
15        ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta,
16        Timestamp,
17    },
18    doc_scalar,
19    hashed::Hashed,
20    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
21};
22#[cfg(with_testing)]
23use linera_chain::ChainExecutionContext;
24use linera_chain::{
25    data_types::{
26        BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock,
27    },
28    types::{
29        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
30        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
31    },
32    ChainError, ChainStateView,
33};
34use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
35use linera_storage::{Clock as _, Storage};
36use linera_views::{context::InactiveContext, ViewError};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39use tokio::sync::{oneshot, OwnedRwLockReadGuard};
40use tracing::{instrument, trace, warn};
41
42/// A read guard providing access to a chain's [`ChainStateView`].
43///
44/// Holds a read lock on the chain worker state, preventing writes for its
45/// lifetime. The `OwnedRwLockReadGuard` internally holds a strong `Arc`
46/// reference to the `RwLock<ChainWorkerState>`, keeping the state alive.
47/// Dereferences to `ChainStateView`.
48pub struct ChainStateViewReadGuard<S: Storage>(
49    OwnedRwLockReadGuard<ChainWorkerState<S>, ChainStateView<S::Context>>,
50);
51
52impl<S: Storage> std::ops::Deref for ChainStateViewReadGuard<S> {
53    type Target = ChainStateView<S::Context>;
54
55    fn deref(&self) -> &Self::Target {
56        &self.0
57    }
58}
59
60/// Re-export of [`EventSubscriptionsResult`] for use by other crate modules.
61pub(crate) use crate::chain_worker::EventSubscriptionsResult;
62use crate::{
63    chain_worker::{
64        handle::{self, ServiceRuntimeActor},
65        state::ChainWorkerState,
66        BlockOutcome, ChainWorkerConfig, DeliveryNotifier,
67    },
68    client::ListeningMode,
69    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
70    notifier::Notifier,
71    value_cache::ValueCache,
72};
73
74#[cfg(test)]
75#[path = "unit_tests/worker_tests.rs"]
76mod worker_tests;
77
78/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds.
79/// On web targets the future is returned as-is.
80#[cfg(not(web))]
81pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> sync_wrapper::SyncFuture<F> {
82    sync_wrapper::SyncFuture::new(f)
83}
84
85/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds.
86/// On web targets the future is returned as-is.
87#[cfg(web)]
88pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> F {
89    f
90}
91
92#[cfg(with_metrics)]
93mod metrics {
94    use std::sync::LazyLock;
95
96    use linera_base::prometheus_util::{
97        exponential_bucket_interval, register_histogram, register_histogram_vec,
98        register_int_counter, register_int_counter_vec,
99    };
100    use linera_chain::types::ConfirmedBlockCertificate;
101    use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
102
103    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
104        register_histogram_vec(
105            "num_rounds_in_certificate",
106            "Number of rounds in certificate",
107            &["certificate_value", "round_type"],
108            exponential_bucket_interval(0.1, 50.0),
109        )
110    });
111
112    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
113        register_histogram_vec(
114            "num_rounds_in_block_proposal",
115            "Number of rounds in block proposal",
116            &["round_type"],
117            exponential_bucket_interval(0.1, 50.0),
118        )
119    });
120
121    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
122        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
123
124    pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
125        LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
126
127    pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
128        LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
129
130    pub static OPERATION_COUNT: LazyLock<IntCounter> =
131        LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
132
133    pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
134        register_histogram(
135            "operations_per_block",
136            "Number of operations per block",
137            exponential_bucket_interval(1.0, 10000.0),
138        )
139    });
140
141    pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
142        register_histogram(
143            "incoming_bundles_per_block",
144            "Number of incoming bundles per block",
145            exponential_bucket_interval(1.0, 10000.0),
146        )
147    });
148
149    pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
150        register_histogram(
151            "transactions_per_block",
152            "Number of transactions per block",
153            exponential_bucket_interval(1.0, 10000.0),
154        )
155    });
156
157    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
158        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
159    });
160
161    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
162        register_int_counter_vec(
163            "certificates_signed",
164            "Number of confirmed block certificates signed by each validator",
165            &["validator_name"],
166        )
167    });
168
169    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
170        register_int_counter(
171            "chain_info_queries",
172            "Number of chain info queries processed",
173        )
174    });
175
176    /// Holds metrics data extracted from a confirmed block certificate.
177    pub struct MetricsData {
178        certificate_log_str: &'static str,
179        round_type: &'static str,
180        round_number: u32,
181        confirmed_transactions: u64,
182        confirmed_incoming_bundles: u64,
183        confirmed_incoming_messages: u64,
184        confirmed_operations: u64,
185        validators_with_signatures: Vec<String>,
186    }
187
188    impl MetricsData {
189        /// Creates a new `MetricsData` by extracting data from the certificate.
190        pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
191            Self {
192                certificate_log_str: certificate.inner().to_log_str(),
193                round_type: certificate.round.type_name(),
194                round_number: certificate.round.number(),
195                confirmed_transactions: certificate.block().body.transactions.len() as u64,
196                confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
197                    as u64,
198                confirmed_incoming_messages: certificate
199                    .block()
200                    .body
201                    .incoming_bundles()
202                    .map(|b| b.messages().count())
203                    .sum::<usize>() as u64,
204                confirmed_operations: certificate.block().body.operations().count() as u64,
205                validators_with_signatures: certificate
206                    .signatures()
207                    .iter()
208                    .map(|(validator_name, _)| validator_name.to_string())
209                    .collect(),
210            }
211        }
212
213        /// Records the metrics for a processed block.
214        pub fn record(self) {
215            NUM_BLOCKS.with_label_values(&[]).inc();
216            NUM_ROUNDS_IN_CERTIFICATE
217                .with_label_values(&[self.certificate_log_str, self.round_type])
218                .observe(self.round_number as f64);
219            TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
220            INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
221            OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
222            if self.confirmed_transactions > 0 {
223                TRANSACTION_COUNT
224                    .with_label_values(&[])
225                    .inc_by(self.confirmed_transactions);
226                if self.confirmed_incoming_bundles > 0 {
227                    INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
228                }
229                if self.confirmed_incoming_messages > 0 {
230                    INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
231                }
232                if self.confirmed_operations > 0 {
233                    OPERATION_COUNT.inc_by(self.confirmed_operations);
234                }
235            }
236
237            for validator_name in self.validators_with_signatures {
238                CERTIFICATES_SIGNED
239                    .with_label_values(&[&validator_name])
240                    .inc();
241            }
242        }
243    }
244}
245
246/// Instruct the networking layer to send cross-chain requests and/or push notifications.
247#[derive(Default, Debug)]
248pub struct NetworkActions {
249    /// The cross-chain requests
250    pub cross_chain_requests: Vec<CrossChainRequest>,
251    /// The push notifications.
252    pub notifications: Vec<Notification>,
253}
254
255impl NetworkActions {
256    pub fn extend(&mut self, other: NetworkActions) {
257        self.cross_chain_requests.extend(other.cross_chain_requests);
258        self.notifications.extend(other.notifications);
259    }
260}
261
262#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
263/// Notification that a chain has a new certified block or a new message.
264pub struct Notification {
265    pub chain_id: ChainId,
266    pub reason: Reason,
267}
268
269doc_scalar!(
270    Notification,
271    "Notify that a chain has a new certified block or a new message"
272);
273
274#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
275/// Reason for the notification.
276pub enum Reason {
277    NewBlock {
278        height: BlockHeight,
279        hash: CryptoHash,
280    },
281    NewEvents {
282        height: BlockHeight,
283        hash: CryptoHash,
284        event_streams: BTreeSet<StreamId>,
285    },
286    NewIncomingBundle {
287        origin: ChainId,
288        height: BlockHeight,
289    },
290    NewRound {
291        height: BlockHeight,
292        round: Round,
293    },
294    BlockExecuted {
295        height: BlockHeight,
296        hash: CryptoHash,
297    },
298}
299
300/// Error type for worker operations.
301#[derive(Debug, Error)]
302pub enum WorkerError {
303    #[error(transparent)]
304    CryptoError(#[from] CryptoError),
305
306    #[error(transparent)]
307    ArithmeticError(#[from] ArithmeticError),
308
309    #[error(transparent)]
310    ViewError(#[from] ViewError),
311
312    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
313    ReadCertificatesError(Vec<CryptoHash>),
314
315    #[error(transparent)]
316    ChainError(#[from] Box<ChainError>),
317
318    #[error(transparent)]
319    BcsError(#[from] bcs::Error),
320
321    // Chain access control
322    #[error("Block was not signed by an authorized owner")]
323    InvalidOwner,
324
325    #[error("Operations in the block are not authenticated by the proper owner: {0}")]
326    InvalidSigner(AccountOwner),
327
328    // Chaining
329    #[error(
330        "Chain is expecting a next block at height {expected_block_height} but the given block \
331        is at height {found_block_height} instead"
332    )]
333    UnexpectedBlockHeight {
334        expected_block_height: BlockHeight,
335        found_block_height: BlockHeight,
336    },
337    #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
338    InvalidEpoch {
339        chain_id: ChainId,
340        chain_epoch: Epoch,
341        epoch: Epoch,
342    },
343
344    #[error("Events not found: {0:?}")]
345    EventsNotFound(Vec<EventId>),
346
347    // Other server-side errors
348    #[error("Invalid cross-chain request")]
349    InvalidCrossChainRequest,
350    #[error("The block does not contain the hash that we expected for the previous block")]
351    InvalidBlockChaining,
352    #[error(
353        "The given outcome is not what we computed after executing the block.\n\
354        Computed: {computed:#?}\n\
355        Submitted: {submitted:#?}"
356    )]
357    IncorrectOutcome {
358        computed: Box<BlockExecutionOutcome>,
359        submitted: Box<BlockExecutionOutcome>,
360    },
361    #[error(
362        "Block timestamp ({block_timestamp}) is further in the future from local time \
363        ({local_time}) than block time grace period ({block_time_grace_period:?})"
364    )]
365    InvalidTimestamp {
366        block_timestamp: Timestamp,
367        local_time: Timestamp,
368        block_time_grace_period: Duration,
369    },
370    #[error("We don't have the value for the certificate.")]
371    MissingCertificateValue,
372    #[error("The hash certificate doesn't match its value.")]
373    InvalidLiteCertificate,
374    #[error("Fast blocks cannot query oracles")]
375    FastBlockUsingOracles,
376    #[error("Blobs not found: {0:?}")]
377    BlobsNotFound(Vec<BlobId>),
378    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
379    ConfirmedLogEntryNotFound {
380        height: BlockHeight,
381        chain_id: ChainId,
382    },
383    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
384    PreprocessedBlocksEntryNotFound {
385        height: BlockHeight,
386        chain_id: ChainId,
387    },
388    #[error("The block proposal is invalid: {0}")]
389    InvalidBlockProposal(String),
390    #[error("Blob was not required by any pending block")]
391    UnexpectedBlob,
392    #[error("Number of published blobs per block must not exceed {0}")]
393    TooManyPublishedBlobs(u64),
394    #[error("Missing network description")]
395    MissingNetworkDescription,
396    #[error("thread error: {0}")]
397    Thread(#[from] web_thread_pool::Error),
398}
399
400impl WorkerError {
401    /// Returns whether this error is caused by an issue in the local node.
402    ///
403    /// Returns `false` whenever the error could be caused by a bad message from a peer.
404    pub fn is_local(&self) -> bool {
405        match self {
406            WorkerError::CryptoError(_)
407            | WorkerError::ArithmeticError(_)
408            | WorkerError::InvalidOwner
409            | WorkerError::InvalidSigner(_)
410            | WorkerError::UnexpectedBlockHeight { .. }
411            | WorkerError::InvalidEpoch { .. }
412            | WorkerError::EventsNotFound(_)
413            | WorkerError::InvalidBlockChaining
414            | WorkerError::IncorrectOutcome { .. }
415            | WorkerError::InvalidTimestamp { .. }
416            | WorkerError::MissingCertificateValue
417            | WorkerError::InvalidLiteCertificate
418            | WorkerError::FastBlockUsingOracles
419            | WorkerError::BlobsNotFound(_)
420            | WorkerError::InvalidBlockProposal(_)
421            | WorkerError::UnexpectedBlob
422            | WorkerError::TooManyPublishedBlobs(_)
423            | WorkerError::ViewError(ViewError::NotFound(_)) => false,
424            WorkerError::BcsError(_)
425            | WorkerError::InvalidCrossChainRequest
426            | WorkerError::ViewError(_)
427            | WorkerError::ConfirmedLogEntryNotFound { .. }
428            | WorkerError::PreprocessedBlocksEntryNotFound { .. }
429            | WorkerError::MissingNetworkDescription
430            | WorkerError::Thread(_)
431            | WorkerError::ReadCertificatesError(_) => true,
432            WorkerError::ChainError(chain_error) => chain_error.is_local(),
433        }
434    }
435}
436
437impl From<ChainError> for WorkerError {
438    #[instrument(level = "trace", skip(chain_error))]
439    fn from(chain_error: ChainError) -> Self {
440        match chain_error {
441            ChainError::ExecutionError(execution_error, context) => match *execution_error {
442                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
443                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
444                _ => Self::ChainError(Box::new(ChainError::ExecutionError(
445                    execution_error,
446                    context,
447                ))),
448            },
449            error => Self::ChainError(Box::new(error)),
450        }
451    }
452}
453
454#[cfg(with_testing)]
455impl WorkerError {
456    /// Returns the inner [`ExecutionError`] in this error.
457    ///
458    /// # Panics
459    ///
460    /// If this is not caused by an [`ExecutionError`].
461    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
462        let WorkerError::ChainError(chain_error) = self else {
463            panic!("Expected an `ExecutionError`. Got: {self:#?}");
464        };
465
466        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
467            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
468        };
469
470        assert_eq!(context, expected_context);
471
472        *execution_error
473    }
474}
475
476type ChainWorkerArc<S> = Arc<tokio::sync::RwLock<ChainWorkerState<S>>>;
477type ChainWorkerMap<S> =
478    Arc<papaya::HashMap<ChainId, std::sync::Weak<tokio::sync::RwLock<ChainWorkerState<S>>>>>;
479
480/// Starts a background task that periodically removes dead weak references
481/// from the chain handle map. The actual lifetime management is handled by
482/// each handle's keep-alive task.
483fn start_sweep<S: Storage + Clone + 'static>(
484    chain_workers: &ChainWorkerMap<S>,
485    config: &ChainWorkerConfig,
486) {
487    // Sweep at the smaller of the two TTLs. If both are None, workers
488    // live forever so there's nothing to sweep.
489    let interval = match (config.ttl, config.sender_chain_ttl) {
490        (None, None) => return,
491        (Some(d), None) | (None, Some(d)) => d,
492        (Some(a), Some(b)) => a.min(b),
493    };
494    let weak_map = Arc::downgrade(chain_workers);
495    linera_base::Task::spawn(async move {
496        loop {
497            linera_base::time::timer::sleep(interval).await;
498            let Some(map) = weak_map.upgrade() else {
499                break;
500            };
501            let guard = map.guard();
502            let dead_keys: Vec<ChainId> = map
503                .pin_owned()
504                .iter()
505                .filter(|(_, weak)| weak.strong_count() == 0)
506                .map(|(chain_id, _)| *chain_id)
507                .collect();
508            for chain_id in dead_keys {
509                map.remove(&chain_id, &guard);
510            }
511            drop(guard);
512            drop(map);
513        }
514    })
515    .forget();
516}
517
518/// State of a worker in a validator or a local node.
519pub struct WorkerState<StorageClient: Storage> {
520    /// Access to local persistent storage.
521    storage: StorageClient,
522    /// Configuration options for chain workers.
523    chain_worker_config: ChainWorkerConfig,
524    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
525    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
526    /// Chains tracked by a worker, along with their listening modes.
527    chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
528    /// One-shot channels to notify callers when messages of a particular chain have been
529    /// delivered.
530    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
531    /// The cache of loaded chain workers. Stores weak references; each worker
532    /// manages its own lifetime via a keep-alive task. A background sweep
533    /// periodically removes dead entries.
534    chain_workers: ChainWorkerMap<StorageClient>,
535}
536
537impl<StorageClient> Clone for WorkerState<StorageClient>
538where
539    StorageClient: Storage + Clone,
540{
541    fn clone(&self) -> Self {
542        WorkerState {
543            storage: self.storage.clone(),
544            chain_worker_config: self.chain_worker_config.clone(),
545            block_cache: self.block_cache.clone(),
546            execution_state_cache: self.execution_state_cache.clone(),
547            chain_modes: self.chain_modes.clone(),
548            delivery_notifiers: self.delivery_notifiers.clone(),
549            chain_workers: self.chain_workers.clone(),
550        }
551    }
552}
553
554pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
555
556impl<StorageClient> WorkerState<StorageClient>
557where
558    StorageClient: Storage,
559{
560    #[instrument(level = "trace", skip(self))]
561    pub fn nickname(&self) -> &str {
562        &self.chain_worker_config.nickname
563    }
564
565    /// Returns the storage client so that it can be manipulated or queried.
566    #[instrument(level = "trace", skip(self))]
567    #[cfg(not(feature = "test"))]
568    pub(crate) fn storage_client(&self) -> &StorageClient {
569        &self.storage
570    }
571
572    /// Returns the storage client so that it can be manipulated or queried by tests in other
573    /// crates.
574    #[instrument(level = "trace", skip(self))]
575    #[cfg(feature = "test")]
576    pub fn storage_client(&self) -> &StorageClient {
577        &self.storage
578    }
579
580    #[instrument(level = "trace", skip(self, certificate))]
581    pub(crate) async fn full_certificate(
582        &self,
583        certificate: LiteCertificate<'_>,
584    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
585        let block = self
586            .block_cache
587            .get(&certificate.value.value_hash)
588            .ok_or(WorkerError::MissingCertificateValue)?;
589
590        match certificate.value.kind {
591            linera_chain::types::CertificateKind::Confirmed => {
592                let value = ConfirmedBlock::from_hashed(block);
593                Ok(Either::Left(
594                    certificate
595                        .with_value(value)
596                        .ok_or(WorkerError::InvalidLiteCertificate)?,
597                ))
598            }
599            linera_chain::types::CertificateKind::Validated => {
600                let value = ValidatedBlock::from_hashed(block);
601                Ok(Either::Right(
602                    certificate
603                        .with_value(value)
604                        .ok_or(WorkerError::InvalidLiteCertificate)?,
605                ))
606            }
607            _ => Err(WorkerError::InvalidLiteCertificate),
608        }
609    }
610}
611
612#[allow(async_fn_in_trait)]
613#[cfg_attr(not(web), trait_variant::make(Send))]
614pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
615    async fn process_certificate<S: Storage + Clone + 'static>(
616        worker: &WorkerState<S>,
617        certificate: GenericCertificate<Self>,
618    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
619}
620
621impl ProcessableCertificate for ConfirmedBlock {
622    async fn process_certificate<S: Storage + Clone + 'static>(
623        worker: &WorkerState<S>,
624        certificate: ConfirmedBlockCertificate,
625    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
626        Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
627    }
628}
629
630impl ProcessableCertificate for ValidatedBlock {
631    async fn process_certificate<S: Storage + Clone + 'static>(
632        worker: &WorkerState<S>,
633        certificate: ValidatedBlockCertificate,
634    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
635        Box::pin(worker.handle_validated_certificate(certificate)).await
636    }
637}
638
639impl ProcessableCertificate for Timeout {
640    async fn process_certificate<S: Storage + Clone + 'static>(
641        worker: &WorkerState<S>,
642        certificate: TimeoutCertificate,
643    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
644        worker.handle_timeout_certificate(certificate).await
645    }
646}
647
648impl<StorageClient> WorkerState<StorageClient>
649where
650    StorageClient: Storage + Clone + 'static,
651{
652    /// Creates a new `WorkerState`.
653    ///
654    /// The `chain_worker_config` must be fully configured before calling this, because the
655    /// TTL sweep task is started immediately based on the config's TTL values.
656    #[instrument(level = "trace", skip(storage, chain_worker_config))]
657    pub fn new(
658        storage: StorageClient,
659        chain_worker_config: ChainWorkerConfig,
660        chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
661    ) -> Self {
662        let chain_workers = Arc::new(papaya::HashMap::new());
663        start_sweep(&chain_workers, &chain_worker_config);
664        let block_cache_size = chain_worker_config.block_cache_size;
665        let execution_state_cache_size = chain_worker_config.execution_state_cache_size;
666        WorkerState {
667            storage,
668            chain_worker_config,
669            block_cache: Arc::new(ValueCache::new(block_cache_size)),
670            execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
671            chain_modes,
672            delivery_notifiers: Arc::default(),
673            chain_workers,
674        }
675    }
676
677    #[instrument(level = "trace", skip(self, certificate, notifier))]
678    #[inline]
679    pub async fn fully_handle_certificate_with_notifications<T>(
680        &self,
681        certificate: GenericCertificate<T>,
682        notifier: &impl Notifier,
683    ) -> Result<ChainInfoResponse, WorkerError>
684    where
685        T: ProcessableCertificate,
686    {
687        let notifications = (*notifier).clone();
688        let this = self.clone();
689        linera_base::Task::spawn(async move {
690            let (response, actions) =
691                ProcessableCertificate::process_certificate(&this, certificate).await?;
692            notifications.notify(&actions.notifications);
693            let mut requests = VecDeque::from(actions.cross_chain_requests);
694            while let Some(request) = requests.pop_front() {
695                let actions = this.handle_cross_chain_request(request).await?;
696                requests.extend(actions.cross_chain_requests);
697                notifications.notify(&actions.notifications);
698            }
699            Ok(response)
700        })
701        .await
702    }
703
704    /// Acquires a read lock on the chain worker and executes the given closure.
705    ///
706    /// The future is boxed to keep deeply nested types off the stack. On non-web
707    /// targets it is also wrapped in `SyncFuture` to satisfy `Sync` bounds.
708    async fn chain_read<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
709    where
710        F: FnOnce(OwnedRwLockReadGuard<ChainWorkerState<StorageClient>>) -> Fut,
711        Fut: std::future::Future<Output = Result<R, WorkerError>>,
712    {
713        let state = self.get_or_create_chain_worker(chain_id).await?;
714        Box::pin(wrap_future(async move {
715            let guard = handle::read_lock(&state).await;
716            f(guard).await
717        }))
718        .await
719    }
720
721    /// Acquires a write lock on the chain worker and executes the given closure.
722    ///
723    /// The [`RollbackGuard`] automatically rolls back uncommitted chain state changes
724    /// when dropped, ensuring cancellation safety. The future is boxed to keep deeply
725    /// nested types off the stack.
726    async fn chain_write<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
727    where
728        F: FnOnce(handle::RollbackGuard<StorageClient>) -> Fut,
729        Fut: std::future::Future<Output = Result<R, WorkerError>>,
730    {
731        let state = self.get_or_create_chain_worker(chain_id).await?;
732        Box::pin(wrap_future(async move {
733            let guard = handle::write_lock(&state).await;
734            f(guard).await
735        }))
736        .await
737    }
738
739    /// Gets or creates a chain worker for the given chain.
740    ///
741    /// Returns a type-erased future to keep `!Sync` intermediate types (e.g.
742    /// `std::sync::mpsc::Receiver` from `ServiceRuntimeActor::spawn`) out of
743    /// the caller's future type.
744    fn get_or_create_chain_worker(
745        &self,
746        chain_id: ChainId,
747    ) -> std::pin::Pin<
748        Box<
749            impl std::future::Future<Output = Result<ChainWorkerArc<StorageClient>, WorkerError>> + '_,
750        >,
751    > {
752        Box::pin(wrap_future(async move {
753            // Fast path: check if a live worker already exists.
754            {
755                let guard = self.chain_workers.pin();
756                if let Some(weak) = guard.get(&chain_id) {
757                    if let Some(strong) = weak.upgrade() {
758                        return Ok(strong);
759                    }
760                }
761            }
762
763            // Slow path: load the chain state (no lock held).
764            let delivery_notifier = self
765                .delivery_notifiers
766                .lock()
767                .unwrap()
768                .entry(chain_id)
769                .or_default()
770                .clone();
771
772            let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
773                chain_modes
774                    .read()
775                    .unwrap()
776                    .get(&chain_id)
777                    .is_some_and(ListeningMode::is_full)
778            });
779
780            let (service_runtime_task, service_runtime_endpoint) =
781                if self.chain_worker_config.long_lived_services {
782                    let actor =
783                        ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await;
784                    (Some(actor.task), Some(actor.endpoint))
785                } else {
786                    (None, None)
787                };
788
789            let state = crate::chain_worker::state::ChainWorkerState::load(
790                self.chain_worker_config.clone(),
791                self.storage.clone(),
792                self.block_cache.clone(),
793                self.execution_state_cache.clone(),
794                self.chain_modes.clone(),
795                delivery_notifier,
796                chain_id,
797                service_runtime_endpoint,
798                service_runtime_task,
799            )
800            .await?;
801
802            let worker = handle::create_chain_worker(state, is_tracked, &self.chain_worker_config);
803
804            // Atomically insert our entry, but only if no live worker exists yet.
805            let weak = Arc::downgrade(&worker);
806            let guard = self.chain_workers.guard();
807            match self.chain_workers.compute(
808                chain_id,
809                |existing| match existing {
810                    Some((_, e)) => match e.upgrade() {
811                        Some(live) => papaya::Operation::Abort(live),
812                        None => papaya::Operation::Insert(weak.clone()),
813                    },
814                    None => papaya::Operation::Insert(weak.clone()),
815                },
816                &guard,
817            ) {
818                papaya::Compute::Aborted(existing, ..) => Ok(existing),
819                papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => Ok(worker),
820                papaya::Compute::Removed { .. } => unreachable!(),
821            }
822        }))
823    }
824
825    /// Tries to execute a block proposal without any verification other than block execution.
826    #[instrument(level = "trace", skip(self, block))]
827    pub async fn stage_block_execution(
828        &self,
829        block: ProposedBlock,
830        round: Option<u32>,
831        published_blobs: Vec<Blob>,
832    ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> {
833        let chain_id = block.chain_id;
834        self.chain_write(chain_id, |mut guard| async move {
835            guard
836                .stage_block_execution(block, round, &published_blobs)
837                .await
838        })
839        .await
840    }
841
842    /// Tries to execute a block proposal with a policy for handling bundle failures.
843    ///
844    /// Returns the modified block (bundles may be rejected/removed), the executed block,
845    /// chain info response, and resource tracker.
846    #[instrument(level = "trace", skip(self, block))]
847    pub async fn stage_block_execution_with_policy(
848        &self,
849        block: ProposedBlock,
850        round: Option<u32>,
851        published_blobs: Vec<Blob>,
852        policy: BundleExecutionPolicy,
853    ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> {
854        let chain_id = block.chain_id;
855        self.chain_write(chain_id, |mut guard| async move {
856            guard
857                .stage_block_execution_with_policy(block, round, &published_blobs, policy)
858                .await
859        })
860        .await
861    }
862
863    /// Executes a [`Query`] for an application's state on a specific chain.
864    ///
865    /// If `block_hash` is specified, system will query the application's state
866    /// at that block. If it doesn't exist, it uses latest state.
867    #[instrument(level = "trace", skip(self, chain_id, query))]
868    pub async fn query_application(
869        &self,
870        chain_id: ChainId,
871        query: Query,
872        block_hash: Option<CryptoHash>,
873    ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
874        self.chain_write(chain_id, |mut guard| async move {
875            guard.query_application(query, block_hash).await
876        })
877        .await
878    }
879
880    #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
881        nickname = %self.nickname(),
882        chain_id = %chain_id,
883        application_id = %application_id
884    ))]
885    pub async fn describe_application(
886        &self,
887        chain_id: ChainId,
888        application_id: ApplicationId,
889    ) -> Result<ApplicationDescription, WorkerError> {
890        let state = self.get_or_create_chain_worker(chain_id).await?;
891        let guard = handle::read_lock_initialized(&state).await?;
892        guard.describe_application_readonly(application_id).await
893    }
894
895    /// Processes a confirmed block (aka a commit).
896    #[instrument(
897        level = "trace",
898        skip(self, certificate, notify_when_messages_are_delivered),
899        fields(
900            nickname = %self.nickname(),
901            chain_id = %certificate.block().header.chain_id,
902            block_height = %certificate.block().header.height
903        )
904    )]
905    async fn process_confirmed_block(
906        &self,
907        certificate: ConfirmedBlockCertificate,
908        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
909    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
910        let chain_id = certificate.block().header.chain_id;
911        self.chain_write(chain_id, |mut guard| async move {
912            guard
913                .process_confirmed_block(certificate, notify_when_messages_are_delivered)
914                .await
915        })
916        .await
917    }
918
919    /// Processes a validated block issued from a multi-owner chain.
920    #[instrument(level = "trace", skip(self, certificate), fields(
921        nickname = %self.nickname(),
922        chain_id = %certificate.block().header.chain_id,
923        block_height = %certificate.block().header.height
924    ))]
925    async fn process_validated_block(
926        &self,
927        certificate: ValidatedBlockCertificate,
928    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
929        let chain_id = certificate.block().header.chain_id;
930        self.chain_write(chain_id, |mut guard| async move {
931            guard.process_validated_block(certificate).await
932        })
933        .await
934    }
935
936    /// Processes a leader timeout issued from a multi-owner chain.
937    #[instrument(level = "trace", skip(self, certificate), fields(
938        nickname = %self.nickname(),
939        chain_id = %certificate.value().chain_id(),
940        height = %certificate.value().height()
941    ))]
942    async fn process_timeout(
943        &self,
944        certificate: TimeoutCertificate,
945    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
946        let chain_id = certificate.value().chain_id();
947        self.chain_write(chain_id, |mut guard| async move {
948            guard.process_timeout(certificate).await
949        })
950        .await
951    }
952
953    #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
954        nickname = %self.nickname(),
955        origin = %origin,
956        recipient = %recipient,
957        num_bundles = %bundles.len()
958    ))]
959    async fn process_cross_chain_update(
960        &self,
961        origin: ChainId,
962        recipient: ChainId,
963        bundles: Vec<(Epoch, MessageBundle)>,
964    ) -> Result<Option<BlockHeight>, WorkerError> {
965        self.chain_write(recipient, |mut guard| async move {
966            guard.process_cross_chain_update(origin, bundles).await
967        })
968        .await
969    }
970
971    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
972    #[instrument(level = "trace", skip(self, chain_id, height), fields(
973        nickname = %self.nickname(),
974        chain_id = %chain_id,
975        height = %height
976    ))]
977    #[cfg(with_testing)]
978    pub async fn read_certificate(
979        &self,
980        chain_id: ChainId,
981        height: BlockHeight,
982    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
983        let state = self.get_or_create_chain_worker(chain_id).await?;
984        let guard = handle::read_lock_initialized(&state).await?;
985        guard.read_certificate(height).await
986    }
987
988    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
989    /// [`ChainId`].
990    ///
991    /// The returned guard holds a read lock on the chain state, preventing writes for
992    /// its lifetime. Multiple concurrent readers are allowed.
993    #[instrument(level = "trace", skip(self), fields(
994        nickname = %self.nickname(),
995        chain_id = %chain_id
996    ))]
997    pub async fn chain_state_view(
998        &self,
999        chain_id: ChainId,
1000    ) -> Result<ChainStateViewReadGuard<StorageClient>, WorkerError> {
1001        let state = self.get_or_create_chain_worker(chain_id).await?;
1002        let guard = handle::read_lock(&state).await;
1003        Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map(
1004            guard,
1005            |s| s.chain(),
1006        )))
1007    }
1008
1009    #[instrument(skip_all, fields(
1010        nick = self.nickname(),
1011        chain_id = format!("{:.8}", proposal.content.block.chain_id),
1012        height = %proposal.content.block.height,
1013    ))]
1014    pub async fn handle_block_proposal(
1015        &self,
1016        proposal: BlockProposal,
1017    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1018        trace!("{} <-- {:?}", self.nickname(), proposal);
1019        #[cfg(with_metrics)]
1020        let round = proposal.content.round;
1021
1022        let chain_id = proposal.content.block.chain_id;
1023        // Delay if block timestamp is in the future but within grace period.
1024        let now = self.storage.clock().current_time();
1025        let block_timestamp = proposal.content.block.timestamp;
1026        let delta = block_timestamp.delta_since(now);
1027        let grace_period = TimeDelta::from_micros(
1028            u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros())
1029                .unwrap_or(u64::MAX),
1030        );
1031        if delta > TimeDelta::ZERO && delta <= grace_period {
1032            self.storage.clock().sleep_until(block_timestamp).await;
1033        }
1034
1035        let response = self
1036            .chain_write(chain_id, |mut guard| async move {
1037                guard.handle_block_proposal(proposal).await
1038            })
1039            .await?;
1040        #[cfg(with_metrics)]
1041        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1042            .with_label_values(&[round.type_name()])
1043            .observe(round.number() as f64);
1044        Ok(response)
1045    }
1046
1047    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
1048    // Other fields will be included in the caller's span.
1049    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
1050    pub async fn handle_lite_certificate(
1051        &self,
1052        certificate: LiteCertificate<'_>,
1053        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1054    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1055        match self.full_certificate(certificate).await? {
1056            Either::Left(confirmed) => {
1057                Box::pin(
1058                    self.handle_confirmed_certificate(
1059                        confirmed,
1060                        notify_when_messages_are_delivered,
1061                    ),
1062                )
1063                .await
1064            }
1065            Either::Right(validated) => {
1066                if let Some(notifier) = notify_when_messages_are_delivered {
1067                    // Nothing to wait for.
1068                    if let Err(()) = notifier.send(()) {
1069                        warn!("Failed to notify message delivery to caller");
1070                    }
1071                }
1072                Box::pin(self.handle_validated_certificate(validated)).await
1073            }
1074        }
1075    }
1076
1077    /// Processes a confirmed block certificate.
1078    #[instrument(skip_all, fields(
1079        nick = self.nickname(),
1080        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1081        height = %certificate.block().header.height,
1082    ))]
1083    pub async fn handle_confirmed_certificate(
1084        &self,
1085        certificate: ConfirmedBlockCertificate,
1086        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1087    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1088        trace!("{} <-- {:?}", self.nickname(), certificate);
1089        #[cfg(with_metrics)]
1090        let metrics_data = metrics::MetricsData::new(&certificate);
1091
1092        let (info, actions, _outcome) =
1093            Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1094                .await?;
1095
1096        #[cfg(with_metrics)]
1097        if matches!(_outcome, BlockOutcome::Processed) {
1098            metrics_data.record();
1099        }
1100        Ok((info, actions))
1101    }
1102
1103    /// Processes a validated block certificate.
1104    #[instrument(skip_all, fields(
1105        nick = self.nickname(),
1106        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1107        height = %certificate.block().header.height,
1108    ))]
1109    pub async fn handle_validated_certificate(
1110        &self,
1111        certificate: ValidatedBlockCertificate,
1112    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1113        trace!("{} <-- {:?}", self.nickname(), certificate);
1114
1115        #[cfg(with_metrics)]
1116        let round = certificate.round;
1117        #[cfg(with_metrics)]
1118        let cert_str = certificate.inner().to_log_str();
1119
1120        let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1121        #[cfg(with_metrics)]
1122        {
1123            if matches!(_outcome, BlockOutcome::Processed) {
1124                metrics::NUM_ROUNDS_IN_CERTIFICATE
1125                    .with_label_values(&[cert_str, round.type_name()])
1126                    .observe(round.number() as f64);
1127            }
1128        }
1129        Ok((info, actions))
1130    }
1131
1132    /// Processes a timeout certificate
1133    #[instrument(skip_all, fields(
1134        nick = self.nickname(),
1135        chain_id = format!("{:.8}", certificate.inner().chain_id()),
1136        height = %certificate.inner().height(),
1137    ))]
1138    pub async fn handle_timeout_certificate(
1139        &self,
1140        certificate: TimeoutCertificate,
1141    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1142        trace!("{} <-- {:?}", self.nickname(), certificate);
1143        self.process_timeout(certificate).await
1144    }
1145
1146    #[instrument(skip_all, fields(
1147        nick = self.nickname(),
1148        chain_id = format!("{:.8}", query.chain_id)
1149    ))]
1150    pub async fn handle_chain_info_query(
1151        &self,
1152        query: ChainInfoQuery,
1153    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1154        trace!("{} <-- {:?}", self.nickname(), query);
1155        #[cfg(with_metrics)]
1156        metrics::CHAIN_INFO_QUERIES.inc();
1157        let chain_id = query.chain_id;
1158        let result = self
1159            .chain_write(chain_id, |mut guard| async move {
1160                guard.handle_chain_info_query(query).await
1161            })
1162            .await;
1163        trace!("{} --> {:?}", self.nickname(), result);
1164        result
1165    }
1166
1167    #[instrument(skip_all, fields(
1168        nick = self.nickname(),
1169        chain_id = format!("{:.8}", chain_id)
1170    ))]
1171    pub async fn download_pending_blob(
1172        &self,
1173        chain_id: ChainId,
1174        blob_id: BlobId,
1175    ) -> Result<Blob, WorkerError> {
1176        trace!(
1177            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1178            self.nickname()
1179        );
1180        let result = self
1181            .chain_read(chain_id, |guard| async move {
1182                guard.download_pending_blob(blob_id).await
1183            })
1184            .await;
1185        trace!(
1186            "{} --> {:?}",
1187            self.nickname(),
1188            result.as_ref().map(|_| blob_id)
1189        );
1190        result
1191    }
1192
1193    #[instrument(skip_all, fields(
1194        nick = self.nickname(),
1195        chain_id = format!("{:.8}", chain_id)
1196    ))]
1197    pub async fn handle_pending_blob(
1198        &self,
1199        chain_id: ChainId,
1200        blob: Blob,
1201    ) -> Result<ChainInfoResponse, WorkerError> {
1202        let blob_id = blob.id();
1203        trace!(
1204            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1205            self.nickname()
1206        );
1207        let result = self
1208            .chain_write(chain_id, |mut guard| async move {
1209                guard.handle_pending_blob(blob).await
1210            })
1211            .await;
1212        trace!(
1213            "{} --> {:?}",
1214            self.nickname(),
1215            result.as_ref().map(|_| blob_id)
1216        );
1217        result
1218    }
1219
1220    #[instrument(skip_all, fields(
1221        nick = self.nickname(),
1222        chain_id = format!("{:.8}", request.target_chain_id())
1223    ))]
1224    pub async fn handle_cross_chain_request(
1225        &self,
1226        request: CrossChainRequest,
1227    ) -> Result<NetworkActions, WorkerError> {
1228        trace!("{} <-- {:?}", self.nickname(), request);
1229        match request {
1230            CrossChainRequest::UpdateRecipient {
1231                sender,
1232                recipient,
1233                bundles,
1234            } => {
1235                let mut actions = NetworkActions::default();
1236                let origin = sender;
1237                let Some(height) = self
1238                    .process_cross_chain_update(origin, recipient, bundles)
1239                    .await?
1240                else {
1241                    return Ok(actions);
1242                };
1243                actions.notifications.push(Notification {
1244                    chain_id: recipient,
1245                    reason: Reason::NewIncomingBundle { origin, height },
1246                });
1247                actions
1248                    .cross_chain_requests
1249                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1250                        sender,
1251                        recipient,
1252                        latest_height: height,
1253                    });
1254                Ok(actions)
1255            }
1256            CrossChainRequest::ConfirmUpdatedRecipient {
1257                sender,
1258                recipient,
1259                latest_height,
1260            } => {
1261                self.chain_write(sender, |mut guard| async move {
1262                    guard
1263                        .confirm_updated_recipient(recipient, latest_height)
1264                        .await
1265                })
1266                .await?;
1267                Ok(NetworkActions::default())
1268            }
1269        }
1270    }
1271
1272    /// Updates the received certificate trackers to at least the given values.
1273    #[instrument(skip_all, fields(
1274        nickname = %self.nickname(),
1275        chain_id = %chain_id,
1276        num_trackers = %new_trackers.len()
1277    ))]
1278    pub async fn update_received_certificate_trackers(
1279        &self,
1280        chain_id: ChainId,
1281        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1282    ) -> Result<(), WorkerError> {
1283        self.chain_write(chain_id, |mut guard| async move {
1284            guard
1285                .update_received_certificate_trackers(new_trackers)
1286                .await
1287        })
1288        .await
1289    }
1290
1291    /// Gets preprocessed block hashes in a given height range.
1292    #[instrument(skip_all, fields(
1293        nickname = %self.nickname(),
1294        chain_id = %chain_id,
1295        start = %start,
1296        end = %end
1297    ))]
1298    pub async fn get_preprocessed_block_hashes(
1299        &self,
1300        chain_id: ChainId,
1301        start: BlockHeight,
1302        end: BlockHeight,
1303    ) -> Result<Vec<CryptoHash>, WorkerError> {
1304        self.chain_read(chain_id, |guard| async move {
1305            guard.get_preprocessed_block_hashes(start, end).await
1306        })
1307        .await
1308    }
1309
1310    /// Gets the next block height to receive from an inbox.
1311    #[instrument(skip_all, fields(
1312        nickname = %self.nickname(),
1313        chain_id = %chain_id,
1314        origin = %origin
1315    ))]
1316    pub async fn get_inbox_next_height(
1317        &self,
1318        chain_id: ChainId,
1319        origin: ChainId,
1320    ) -> Result<BlockHeight, WorkerError> {
1321        self.chain_read(chain_id, |guard| async move {
1322            guard.get_inbox_next_height(origin).await
1323        })
1324        .await
1325    }
1326
1327    /// Gets locking blobs for specific blob IDs.
1328    /// Returns `Ok(None)` if any of the blobs is not found.
1329    #[instrument(skip_all, fields(
1330        nickname = %self.nickname(),
1331        chain_id = %chain_id,
1332        num_blob_ids = %blob_ids.len()
1333    ))]
1334    pub async fn get_locking_blobs(
1335        &self,
1336        chain_id: ChainId,
1337        blob_ids: Vec<BlobId>,
1338    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1339        self.chain_read(chain_id, |guard| async move {
1340            guard.get_locking_blobs(blob_ids).await
1341        })
1342        .await
1343    }
1344
1345    /// Gets block hashes for the given heights.
1346    pub async fn get_block_hashes(
1347        &self,
1348        chain_id: ChainId,
1349        heights: Vec<BlockHeight>,
1350    ) -> Result<Vec<CryptoHash>, WorkerError> {
1351        self.chain_read(chain_id, |guard| async move {
1352            guard.get_block_hashes(heights).await
1353        })
1354        .await
1355    }
1356
1357    /// Gets proposed blobs from the manager for specified blob IDs.
1358    pub async fn get_proposed_blobs(
1359        &self,
1360        chain_id: ChainId,
1361        blob_ids: Vec<BlobId>,
1362    ) -> Result<Vec<Blob>, WorkerError> {
1363        self.chain_read(chain_id, |guard| async move {
1364            guard.get_proposed_blobs(blob_ids).await
1365        })
1366        .await
1367    }
1368
1369    /// Gets event subscriptions from the chain.
1370    pub async fn get_event_subscriptions(
1371        &self,
1372        chain_id: ChainId,
1373    ) -> Result<EventSubscriptionsResult, WorkerError> {
1374        self.chain_read(chain_id, |guard| async move {
1375            guard.get_event_subscriptions().await
1376        })
1377        .await
1378    }
1379
1380    /// Gets the next expected event index for a stream.
1381    pub async fn get_next_expected_event(
1382        &self,
1383        chain_id: ChainId,
1384        stream_id: StreamId,
1385    ) -> Result<Option<u32>, WorkerError> {
1386        self.chain_read(chain_id, |guard| async move {
1387            guard.get_next_expected_event(stream_id).await
1388        })
1389        .await
1390    }
1391
1392    /// Gets received certificate trackers.
1393    pub async fn get_received_certificate_trackers(
1394        &self,
1395        chain_id: ChainId,
1396    ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1397        self.chain_read(chain_id, |guard| async move {
1398            guard.get_received_certificate_trackers().await
1399        })
1400        .await
1401    }
1402
1403    /// Gets tip state and outbox info for next_outbox_heights calculation.
1404    pub async fn get_tip_state_and_outbox_info(
1405        &self,
1406        chain_id: ChainId,
1407        receiver_id: ChainId,
1408    ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1409        self.chain_read(chain_id, |guard| async move {
1410            guard.get_tip_state_and_outbox_info(receiver_id).await
1411        })
1412        .await
1413    }
1414
1415    /// Gets the next height to preprocess.
1416    pub async fn get_next_height_to_preprocess(
1417        &self,
1418        chain_id: ChainId,
1419    ) -> Result<BlockHeight, WorkerError> {
1420        self.chain_read(chain_id, |guard| async move {
1421            guard.get_next_height_to_preprocess().await
1422        })
1423        .await
1424    }
1425}
1426
1427#[cfg(with_testing)]
1428impl<StorageClient> WorkerState<StorageClient>
1429where
1430    StorageClient: Storage + Clone + 'static,
1431{
1432    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1433    ///
1434    /// # Panics
1435    ///
1436    /// If the validator doesn't have a key pair assigned to it.
1437    #[instrument(level = "trace", skip(self))]
1438    pub fn public_key(&self) -> ValidatorPublicKey {
1439        self.chain_worker_config
1440            .key_pair()
1441            .expect(
1442                "Test validator should have a key pair assigned to it \
1443                in order to obtain its public key",
1444            )
1445            .public()
1446    }
1447}