Skip to main content

linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7    future::Future,
8    pin,
9    sync::{Arc, Mutex, RwLock},
10    time::Duration,
11};
12
13use futures::{
14    future::{self, Either, Shared, WeakShared},
15    FutureExt as _,
16};
17use linera_base::{
18    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
19    data_types::{
20        ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta,
21        Timestamp,
22    },
23    doc_scalar,
24    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
25};
26use linera_cache::{Arc as CacheArc, UniqueValueCache, ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
27#[cfg(with_testing)]
28use linera_chain::ChainExecutionContext;
29use linera_chain::{
30    data_types::{BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock},
31    types::{
32        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
33        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
34    },
35    ChainError, ChainStateView,
36};
37use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
38use linera_storage::{Clock as _, Storage};
39use linera_views::{context::InactiveContext, ViewError};
40use serde::{Deserialize, Serialize};
41use thiserror::Error;
42use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
43use tracing::{debug, instrument, trace, warn};
44
45/// A read guard providing access to a chain's [`ChainStateView`].
46///
47/// Holds a read lock on the chain worker state, preventing writes for its
48/// lifetime. The `OwnedRwLockReadGuard` internally holds a strong `Arc`
49/// reference to the `RwLock<ChainWorkerState>`, keeping the state alive.
50/// Dereferences to `ChainStateView`.
51pub struct ChainStateViewReadGuard<S: Storage>(
52    OwnedRwLockReadGuard<ChainWorkerState<S>, ChainStateView<S::Context>>,
53);
54
55impl<S: Storage> std::ops::Deref for ChainStateViewReadGuard<S> {
56    type Target = ChainStateView<S::Context>;
57
58    fn deref(&self) -> &Self::Target {
59        &self.0
60    }
61}
62
63/// Re-export of [`EventSubscriptionsResult`] for use by other crate modules.
64pub(crate) use crate::chain_worker::EventSubscriptionsResult;
65use crate::{
66    chain_worker::{
67        handle,
68        state::{send_result, ChainWorkerState},
69        BlockOutcome, ChainWorkerConfig, CrossChainUpdateResult, DeliveryNotifier,
70        ProcessConfirmedBlockMode,
71    },
72    client::{ChainModes, ListeningMode},
73    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
74    notifier::Notifier,
75};
76
77pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000;
78pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
79
80#[cfg(test)]
81#[path = "unit_tests/worker_tests.rs"]
82mod worker_tests;
83
84#[cfg(all(test, feature = "rocksdb"))]
85#[path = "unit_tests/worker_backup_tests.rs"]
86mod worker_backup_tests;
87
88/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds.
89/// On web targets the future is returned as-is.
90#[cfg(not(web))]
91pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> sync_wrapper::SyncFuture<F> {
92    sync_wrapper::SyncFuture::new(f)
93}
94
95/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds.
96/// On web targets the future is returned as-is.
97#[cfg(web)]
98pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> F {
99    f
100}
101
102#[cfg(with_metrics)]
103mod metrics {
104    use std::sync::LazyLock;
105
106    use linera_base::prometheus_util::{
107        exponential_bucket_interval, register_histogram, register_histogram_vec,
108        register_int_counter, register_int_counter_vec,
109    };
110    use linera_chain::{data_types::MessageAction, types::ConfirmedBlockCertificate};
111    use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
112
113    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
114        register_histogram_vec(
115            "num_rounds_in_certificate",
116            "Number of rounds in certificate",
117            &["certificate_value", "round_type"],
118            exponential_bucket_interval(0.1, 50.0),
119        )
120    });
121
122    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
123        register_histogram_vec(
124            "num_rounds_in_block_proposal",
125            "Number of rounds in block proposal",
126            &["round_type"],
127            exponential_bucket_interval(0.1, 50.0),
128        )
129    });
130
131    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
132        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
133
134    pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
135        LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
136
137    pub static REJECTED_BUNDLE_COUNT: LazyLock<IntCounter> =
138        LazyLock::new(|| register_int_counter("rejected_bundle_count", "Rejected bundle count"));
139
140    pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
141        LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
142
143    pub static OPERATION_COUNT: LazyLock<IntCounter> =
144        LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
145
146    pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
147        register_histogram(
148            "operations_per_block",
149            "Number of operations per block",
150            exponential_bucket_interval(1.0, 10000.0),
151        )
152    });
153
154    pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
155        register_histogram(
156            "incoming_bundles_per_block",
157            "Number of incoming bundles per block",
158            exponential_bucket_interval(1.0, 10000.0),
159        )
160    });
161
162    pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
163        register_histogram(
164            "transactions_per_block",
165            "Number of transactions per block",
166            exponential_bucket_interval(1.0, 10000.0),
167        )
168    });
169
170    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
171        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
172    });
173
174    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
175        register_int_counter_vec(
176            "certificates_signed",
177            "Number of confirmed block certificates signed by each validator",
178            &["validator_name"],
179        )
180    });
181
182    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
183        register_int_counter(
184            "chain_info_queries",
185            "Number of chain info queries processed",
186        )
187    });
188
189    pub static CROSS_CHAIN_BATCH_SIZE: LazyLock<Histogram> = LazyLock::new(|| {
190        register_histogram(
191            "cross_chain_batch_size",
192            "Number of cross-chain requests coalesced into a single per-chain batch",
193            exponential_bucket_interval(1.0, 1000.0),
194        )
195    });
196
197    /// Holds metrics data extracted from a confirmed block certificate.
198    pub struct MetricsData {
199        certificate_log_str: &'static str,
200        round_type: &'static str,
201        round_number: u32,
202        confirmed_transactions: u64,
203        confirmed_incoming_bundles: u64,
204        confirmed_rejected_bundles: u64,
205        confirmed_incoming_messages: u64,
206        confirmed_operations: u64,
207        validators_with_signatures: Vec<String>,
208    }
209
210    impl MetricsData {
211        /// Creates a new `MetricsData` by extracting data from the certificate.
212        pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
213            Self {
214                certificate_log_str: certificate.inner().to_log_str(),
215                round_type: certificate.round.type_name(),
216                round_number: certificate.round.number(),
217                confirmed_transactions: certificate.block().body.transactions.len() as u64,
218                confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
219                    as u64,
220                confirmed_rejected_bundles: certificate
221                    .block()
222                    .body
223                    .incoming_bundles()
224                    .filter(|b| b.action == MessageAction::Reject)
225                    .count() as u64,
226                confirmed_incoming_messages: certificate
227                    .block()
228                    .body
229                    .incoming_bundles()
230                    .map(|b| b.messages().count())
231                    .sum::<usize>() as u64,
232                confirmed_operations: certificate.block().body.operations().count() as u64,
233                validators_with_signatures: certificate
234                    .signatures()
235                    .iter()
236                    .map(|(validator_name, _)| validator_name.to_string())
237                    .collect(),
238            }
239        }
240
241        /// Records the metrics for a processed block.
242        pub fn record(self) {
243            NUM_BLOCKS.with_label_values(&[]).inc();
244            NUM_ROUNDS_IN_CERTIFICATE
245                .with_label_values(&[self.certificate_log_str, self.round_type])
246                .observe(self.round_number as f64);
247            TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
248            INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
249            OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
250            if self.confirmed_transactions > 0 {
251                TRANSACTION_COUNT
252                    .with_label_values(&[])
253                    .inc_by(self.confirmed_transactions);
254                if self.confirmed_incoming_bundles > 0 {
255                    INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
256                }
257                if self.confirmed_rejected_bundles > 0 {
258                    REJECTED_BUNDLE_COUNT.inc_by(self.confirmed_rejected_bundles);
259                }
260                if self.confirmed_incoming_messages > 0 {
261                    INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
262                }
263                if self.confirmed_operations > 0 {
264                    OPERATION_COUNT.inc_by(self.confirmed_operations);
265                }
266            }
267
268            for validator_name in self.validators_with_signatures {
269                CERTIFICATES_SIGNED
270                    .with_label_values(&[&validator_name])
271                    .inc();
272            }
273        }
274    }
275}
276
277/// Instruct the networking layer to send cross-chain requests and/or push notifications.
278#[derive(Default, Debug)]
279pub struct NetworkActions {
280    /// The cross-chain requests
281    pub cross_chain_requests: Vec<CrossChainRequest>,
282    /// The push notifications.
283    pub notifications: Vec<Notification>,
284}
285
286impl NetworkActions {
287    pub fn extend(&mut self, other: NetworkActions) {
288        self.cross_chain_requests.extend(other.cross_chain_requests);
289        self.notifications.extend(other.notifications);
290    }
291}
292
293#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
294/// Notification that a chain has a new certified block or a new message.
295pub struct Notification {
296    pub chain_id: ChainId,
297    pub reason: Reason,
298}
299
300doc_scalar!(
301    Notification,
302    "Notify that a chain has a new certified block or a new message"
303);
304
305#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
306/// Reason for the notification.
307pub enum Reason {
308    NewBlock {
309        height: BlockHeight,
310        hash: CryptoHash,
311    },
312    NewEvents {
313        height: BlockHeight,
314        block_hash: CryptoHash,
315        event_streams: BTreeSet<StreamId>,
316    },
317    NewIncomingBundle {
318        origin: ChainId,
319        height: BlockHeight,
320    },
321    NewRound {
322        height: BlockHeight,
323        round: Round,
324    },
325    BlockExecuted {
326        height: BlockHeight,
327        hash: CryptoHash,
328    },
329}
330
331/// Error type for worker operations.
332#[derive(Debug, Error, strum::IntoStaticStr)]
333pub enum WorkerError {
334    #[error(transparent)]
335    CryptoError(#[from] CryptoError),
336
337    #[error(transparent)]
338    ArithmeticError(#[from] ArithmeticError),
339
340    #[error(transparent)]
341    ViewError(#[from] ViewError),
342
343    #[error("Certificates referenced from chain state are missing in storage: {0:?}")]
344    ReadCertificatesError(Vec<CryptoHash>),
345
346    #[error(transparent)]
347    ChainError(#[from] Box<ChainError>),
348
349    #[error(transparent)]
350    BcsError(#[from] bcs::Error),
351
352    // Chain access control
353    #[error("Block was not signed by an authorized owner")]
354    InvalidOwner,
355
356    #[error("Operations in the block are not authenticated by the proper owner: {0}")]
357    InvalidSigner(AccountOwner),
358
359    // Chaining
360    #[error(
361        "Chain is expecting a next block at height {expected_block_height} but the given block \
362        is at height {found_block_height} instead"
363    )]
364    UnexpectedBlockHeight {
365        expected_block_height: BlockHeight,
366        found_block_height: BlockHeight,
367    },
368    #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
369    InvalidEpoch {
370        chain_id: ChainId,
371        chain_epoch: Epoch,
372        epoch: Epoch,
373    },
374
375    #[error("Events not found: {0:?}")]
376    EventsNotFound(Vec<EventId>),
377
378    // Other server-side errors
379    #[error("Invalid cross-chain request")]
380    InvalidCrossChainRequest,
381    #[error("The block does not contain the hash that we expected for the previous block")]
382    InvalidBlockChaining,
383    #[error(
384        "Block timestamp ({block_timestamp}) is further in the future from local time \
385        ({local_time}) than block time grace period ({block_time_grace_period:?})"
386    )]
387    InvalidTimestamp {
388        block_timestamp: Timestamp,
389        local_time: Timestamp,
390        block_time_grace_period: Duration,
391    },
392    #[error("We don't have the value for the certificate.")]
393    MissingCertificateValue,
394    #[error("The hash certificate doesn't match its value.")]
395    InvalidLiteCertificate,
396    #[error("Fast blocks cannot query oracles")]
397    FastBlockUsingOracles,
398    #[error("Blobs not found: {0:?}")]
399    BlobsNotFound(Vec<BlobId>),
400    /// Variant raised when the chain references these block hashes via a
401    /// verified-checkpoint trust mark (`pre_checkpoint_block_trust`) but the
402    /// actual content isn't in storage yet. The caller is expected to upload
403    /// each missing block via `handle_confirmed_certificate`; the trust-mark
404    /// accept path verifies the cert against its own (possibly revoked)
405    /// epoch's committee and writes it through.
406    #[error("Blocks not found: {0:?}")]
407    BlocksNotFound(Vec<CryptoHash>),
408    #[error("Block hash at height {height} for chain {chain_id} not found")]
409    BlockHashNotFound {
410        height: BlockHeight,
411        chain_id: ChainId,
412    },
413    #[error("Block at height {height} on chain {chain_id} not found in local storage")]
414    LocalBlockNotFound {
415        height: BlockHeight,
416        chain_id: ChainId,
417    },
418    #[error("The block proposal is invalid: {0}")]
419    InvalidBlockProposal(String),
420    #[error("Blob was not required by any pending block")]
421    UnexpectedBlob,
422    #[error("Number of published blobs per block must not exceed {0}")]
423    TooManyPublishedBlobs(u64),
424    #[error("Missing network description")]
425    MissingNetworkDescription,
426    #[error("thread error: {0}")]
427    Thread(#[from] web_thread_pool::Error),
428    #[error("Chain worker was poisoned by a journal resolution failure")]
429    PoisonedWorker,
430    #[error("Cross-chain batch was rolled back due to an error in another request")]
431    BatchRolledBack,
432}
433
434impl WorkerError {
435    /// Returns whether this error is caused by an issue in the local node.
436    ///
437    /// Returns `false` whenever the error could be caused by a bad message from a peer.
438    pub fn is_local(&self) -> bool {
439        match self {
440            WorkerError::CryptoError(_)
441            | WorkerError::ArithmeticError(_)
442            | WorkerError::InvalidOwner
443            | WorkerError::InvalidSigner(_)
444            | WorkerError::UnexpectedBlockHeight { .. }
445            | WorkerError::InvalidEpoch { .. }
446            | WorkerError::EventsNotFound(_)
447            | WorkerError::InvalidBlockChaining
448            | WorkerError::InvalidTimestamp { .. }
449            | WorkerError::MissingCertificateValue
450            | WorkerError::InvalidLiteCertificate
451            | WorkerError::FastBlockUsingOracles
452            | WorkerError::BlobsNotFound(_)
453            | WorkerError::BlocksNotFound(_)
454            | WorkerError::InvalidBlockProposal(_)
455            | WorkerError::UnexpectedBlob
456            | WorkerError::TooManyPublishedBlobs(_)
457            | WorkerError::ViewError(ViewError::NotFound(_)) => false,
458            WorkerError::BcsError(_)
459            | WorkerError::InvalidCrossChainRequest
460            | WorkerError::ViewError(_)
461            | WorkerError::BlockHashNotFound { .. }
462            | WorkerError::LocalBlockNotFound { .. }
463            | WorkerError::MissingNetworkDescription
464            | WorkerError::Thread(_)
465            | WorkerError::ReadCertificatesError(_)
466            | WorkerError::PoisonedWorker
467            | WorkerError::BatchRolledBack => true,
468            WorkerError::ChainError(chain_error) => chain_error.is_local(),
469        }
470    }
471
472    /// Returns the qualified error variant name for the `error_type` metric label,
473    /// e.g. `"WorkerError::UnexpectedBlockHeight"`.
474    ///
475    /// For `ChainError` variants, delegates to `ChainError::error_type()` to
476    /// surface the underlying error name rather than just `"ChainError"`.
477    pub fn error_type(&self) -> String {
478        match self {
479            WorkerError::ChainError(chain_error) => chain_error.error_type(),
480            other => {
481                let variant: &'static str = other.into();
482                format!("WorkerError::{variant}")
483            }
484        }
485    }
486
487    /// Returns `true` if this error indicates that the chain worker's in-memory
488    /// state may be inconsistent and must be evicted from the cache.
489    fn must_reload_view(&self) -> bool {
490        matches!(
491            self,
492            WorkerError::PoisonedWorker
493                | WorkerError::ViewError(ViewError::StoreError {
494                    must_reload_view: true,
495                    ..
496                })
497        )
498    }
499
500    /// Returns `true` if this error indicates that the chain's persisted state is
501    /// internally inconsistent, so the worker should consider resetting and
502    /// re-executing it from storage.
503    fn indicates_corrupted_chain_state(&self) -> bool {
504        matches!(
505            self,
506            WorkerError::ChainError(chain_error)
507                if matches!(chain_error.as_ref(), ChainError::CorruptedChainState(_))
508        )
509    }
510}
511
512impl From<ChainError> for WorkerError {
513    #[instrument(level = "trace", skip(chain_error))]
514    fn from(chain_error: ChainError) -> Self {
515        match chain_error {
516            ChainError::ExecutionError(execution_error, context) => match *execution_error {
517                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
518                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
519                _ => Self::ChainError(Box::new(ChainError::ExecutionError(
520                    execution_error,
521                    context,
522                ))),
523            },
524            error => Self::ChainError(Box::new(error)),
525        }
526    }
527}
528
529#[cfg(with_testing)]
530impl WorkerError {
531    /// Returns the inner [`ExecutionError`] in this error.
532    ///
533    /// # Panics
534    ///
535    /// If this is not caused by an [`ExecutionError`].
536    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
537        let WorkerError::ChainError(chain_error) = self else {
538            panic!("Expected an `ExecutionError`. Got: {self:#?}");
539        };
540
541        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
542            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
543        };
544
545        assert_eq!(context, expected_context);
546
547        *execution_error
548    }
549}
550
551type ChainWorkerArc<S> = Arc<tokio::sync::RwLock<ChainWorkerState<S>>>;
552type ChainWorkerWeak<S> = std::sync::Weak<tokio::sync::RwLock<ChainWorkerState<S>>>;
553type ChainWorkerFuture<S> = Shared<oneshot::Receiver<ChainWorkerWeak<S>>>;
554
555/// Each map entry is a `Shared<oneshot::Receiver<Weak<...>>>`:
556///
557/// - `peek()` returns `None` while a task is loading the worker from storage.
558/// - `peek()` returns `Some(Ok(weak))` once the worker is loaded.
559/// - `peek()` returns `Some(Err(_))` if loading failed (sender dropped).
560///
561/// Callers that find a pending entry clone the `Shared` future and await it.
562type ChainWorkerMap<S> = Arc<papaya::HashMap<ChainId, ChainWorkerFuture<S>>>;
563
564/// A cross-chain request waiting to be processed in a batch.
565pub(crate) enum BatchRequest {
566    Update {
567        origin: ChainId,
568        bundles: Vec<(Epoch, MessageBundle)>,
569        previous_height: Option<BlockHeight>,
570        result_sender: oneshot::Sender<Result<CrossChainUpdateResult, WorkerError>>,
571    },
572    Confirm {
573        recipient: ChainId,
574        latest_height: BlockHeight,
575        result_sender: oneshot::Sender<Result<NetworkActions, WorkerError>>,
576    },
577}
578
579/// The inner future type for cross-chain batch processing.
580///
581/// Wrapped in `Shared<BatchFuture>` so that all tasks waiting for
582/// cross-chain operations on the same chain can cooperatively poll a
583/// single driver. The driver loops: wait for an item from the request channel, acquire the
584/// write lock, drain the channel, process all requests in one batch, repeat.
585#[cfg(not(web))]
586type BatchFuture = pin::Pin<Box<dyn Future<Output = ()> + Send>>;
587#[cfg(web)]
588type BatchFuture = pin::Pin<Box<dyn Future<Output = ()>>>;
589
590#[derive(Clone)]
591struct ChainBatchRequestProcessor {
592    /// Sender half of the channel whose receiver lives inside the driver future.
593    sender: mpsc::UnboundedSender<BatchRequest>,
594    /// Weak handle to the shared driver future. Upgraded to `Shared<BatchFuture>`
595    /// by callers who need to poll it.
596    future: WeakShared<BatchFuture>,
597}
598
599impl ChainBatchRequestProcessor {
600    fn create<StorageClient>(
601        state: ChainWorkerArc<StorageClient>,
602        batch_size_limit: usize,
603    ) -> (ChainBatchRequestProcessor, Shared<BatchFuture>)
604    where
605        StorageClient: Storage + Clone + 'static,
606    {
607        let (sender, mut receiver) = mpsc::unbounded_channel();
608        let future: BatchFuture = Box::pin(async move {
609            while let Some(first) = receiver.recv().await {
610                let mut requests = vec![first];
611                match handle::write_lock(&state).await {
612                    Ok(mut guard) => {
613                        while requests.len() < batch_size_limit {
614                            match receiver.try_recv() {
615                                Ok(request) => requests.push(request),
616                                Err(_) => break,
617                            }
618                        }
619                        #[cfg(with_metrics)]
620                        metrics::CROSS_CHAIN_BATCH_SIZE.observe(requests.len() as f64);
621                        guard.process_batch(requests).await
622                    }
623                    Err(error) => {
624                        tracing::error!(%error, "failed to obtain write lock");
625                        for request in requests {
626                            match request {
627                                BatchRequest::Update { result_sender, .. } => {
628                                    send_result(result_sender, Err(WorkerError::PoisonedWorker));
629                                }
630                                BatchRequest::Confirm { result_sender, .. } => {
631                                    send_result(result_sender, Err(WorkerError::PoisonedWorker));
632                                }
633                            }
634                        }
635                    }
636                }
637            }
638        });
639        let shared = future.shared();
640        let weak = shared.downgrade().expect("future has not been polled yet");
641        let batch_processor = ChainBatchRequestProcessor {
642            sender,
643            future: weak,
644        };
645        (batch_processor, shared)
646    }
647}
648
649type ChainBatchMap = Arc<papaya::HashMap<ChainId, ChainBatchRequestProcessor>>;
650
651/// Starts a background task that periodically removes dead weak references
652/// from the chain handle map. The actual lifetime management is handled by
653/// each handle's keep-alive task.
654fn start_sweep<S: Storage + Clone + 'static>(
655    chain_workers: &ChainWorkerMap<S>,
656    config: &ChainWorkerConfig,
657) {
658    // Sweep at the smaller of the two TTLs. If both are None, workers
659    // live forever so there's nothing to sweep.
660    let interval = match (config.ttl, config.sender_chain_ttl) {
661        (None, None) => return,
662        (Some(d), None) | (None, Some(d)) => d,
663        (Some(a), Some(b)) => a.min(b),
664    };
665    let weak_map = Arc::downgrade(chain_workers);
666    linera_base::Task::spawn(async move {
667        loop {
668            linera_base::time::timer::sleep(interval).await;
669            let Some(map) = weak_map.upgrade() else {
670                break;
671            };
672            map.pin_owned().retain(|_, shared| match shared.peek() {
673                Some(Ok(weak)) => weak.strong_count() > 0,
674                Some(Err(_)) => false, // Loading failed; clean up.
675                None => true,          // Still loading; keep.
676            });
677        }
678    })
679    .forget();
680}
681
682/// State of a worker in a validator or a local node.
683pub struct WorkerState<StorageClient: Storage> {
684    /// Access to local persistent storage.
685    storage: StorageClient,
686    /// Configuration options for chain workers.
687    chain_worker_config: ChainWorkerConfig,
688    block_cache: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
689    execution_state_cache:
690        Option<Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>>,
691    /// Chains tracked by a worker, along with their listening modes.
692    pub(crate) chain_modes: Option<Arc<RwLock<ChainModes>>>,
693    /// One-shot channels to notify callers when messages of a particular chain have been
694    /// delivered.
695    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
696    /// The cache of loaded chain workers. Stores weak references; each worker
697    /// manages its own lifetime via a keep-alive task. A background sweep
698    /// periodically removes dead entries.
699    chain_workers: ChainWorkerMap<StorageClient>,
700    /// Per-chain batch processing state for cross-chain requests.
701    chain_batches: ChainBatchMap,
702    /// Shard-routing dispatcher for outbound cross-chain requests. Used when we need
703    /// to send cross-chain requests outside of the normal `NetworkActions` return
704    /// path — in particular, the `RevertConfirm`s emitted after resetting a
705    /// corrupted chain. The RPC server layer installs this; without it, we fall
706    /// back to dispatching locally through `handle_cross_chain_request`.
707    outbound_cross_chain_sender: Option<OutboundCrossChainSender>,
708}
709
710/// Dispatcher for outbound cross-chain requests that handles the source-shard-to-
711/// target-shard routing that the worker itself is not aware of.
712pub type OutboundCrossChainSender = Arc<dyn Fn(CrossChainRequest) + Send + Sync>;
713
714impl<StorageClient> Clone for WorkerState<StorageClient>
715where
716    StorageClient: Storage + Clone,
717{
718    fn clone(&self) -> Self {
719        WorkerState {
720            storage: self.storage.clone(),
721            chain_worker_config: self.chain_worker_config.clone(),
722            block_cache: self.block_cache.clone(),
723            execution_state_cache: self.execution_state_cache.clone(),
724            chain_modes: self.chain_modes.clone(),
725            delivery_notifiers: self.delivery_notifiers.clone(),
726            chain_workers: self.chain_workers.clone(),
727            chain_batches: self.chain_batches.clone(),
728            outbound_cross_chain_sender: self.outbound_cross_chain_sender.clone(),
729        }
730    }
731}
732
733pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
734
735impl<StorageClient> WorkerState<StorageClient>
736where
737    StorageClient: Storage,
738{
739    /// Returns an instance with the specified cross-chain message chunk limit.
740    #[cfg(with_testing)]
741    #[instrument(level = "trace", skip(self))]
742    pub fn with_cross_chain_message_chunk_limit(mut self, limit: usize) -> Self {
743        self.chain_worker_config.cross_chain_message_chunk_limit = limit;
744        self
745    }
746
747    /// Sets the cross-chain message chunk limit.
748    #[cfg(with_testing)]
749    pub fn set_cross_chain_message_chunk_limit(&mut self, limit: usize) {
750        self.chain_worker_config.cross_chain_message_chunk_limit = limit;
751    }
752
753    #[cfg(with_testing)]
754    #[instrument(level = "trace", skip(self, value))]
755    pub fn with_allow_revert_confirm(mut self, value: bool) -> Self {
756        self.chain_worker_config.allow_revert_confirm = value;
757        self
758    }
759
760    #[instrument(level = "trace", skip(self))]
761    pub fn nickname(&self) -> &str {
762        &self.chain_worker_config.nickname
763    }
764
765    /// Returns the storage client so that it can be manipulated or queried.
766    #[instrument(level = "trace", skip(self))]
767    #[cfg(not(feature = "test"))]
768    pub(crate) fn storage_client(&self) -> &StorageClient {
769        &self.storage
770    }
771
772    /// Returns the storage client so that it can be manipulated or queried by tests in other
773    /// crates.
774    #[instrument(level = "trace", skip(self))]
775    #[cfg(feature = "test")]
776    pub fn storage_client(&self) -> &StorageClient {
777        &self.storage
778    }
779
780    #[instrument(level = "trace", skip(self, certificate))]
781    pub(crate) async fn full_certificate(
782        &self,
783        certificate: LiteCertificate<'_>,
784    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
785        let block = self
786            .block_cache
787            .get(&certificate.value.value_hash)
788            .ok_or(WorkerError::MissingCertificateValue)?;
789        let block = CacheArc::unwrap_or_clone(block);
790
791        match certificate.value.kind {
792            linera_chain::types::CertificateKind::Confirmed => Ok(Either::Left(
793                certificate
794                    .with_value(block)
795                    .ok_or(WorkerError::InvalidLiteCertificate)?,
796            )),
797            linera_chain::types::CertificateKind::Validated => {
798                let value = ValidatedBlock::from_hashed(block.into_inner());
799                Ok(Either::Right(
800                    certificate
801                        .with_value(value)
802                        .ok_or(WorkerError::InvalidLiteCertificate)?,
803                ))
804            }
805            _ => Err(WorkerError::InvalidLiteCertificate),
806        }
807    }
808}
809
810#[allow(async_fn_in_trait)]
811#[cfg_attr(not(web), trait_variant::make(Send))]
812pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
813    async fn process_certificate<S: Storage + Clone + 'static>(
814        worker: &WorkerState<S>,
815        certificate: GenericCertificate<Self>,
816    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
817}
818
819impl ProcessableCertificate for ConfirmedBlock {
820    async fn process_certificate<S: Storage + Clone + 'static>(
821        worker: &WorkerState<S>,
822        certificate: ConfirmedBlockCertificate,
823    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
824        Box::pin(worker.handle_confirmed_certificate(
825            certificate,
826            ProcessConfirmedBlockMode::Auto,
827            None,
828        ))
829        .await
830    }
831}
832
833impl ProcessableCertificate for ValidatedBlock {
834    async fn process_certificate<S: Storage + Clone + 'static>(
835        worker: &WorkerState<S>,
836        certificate: ValidatedBlockCertificate,
837    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
838        Box::pin(worker.handle_validated_certificate(certificate)).await
839    }
840}
841
842impl ProcessableCertificate for Timeout {
843    async fn process_certificate<S: Storage + Clone + 'static>(
844        worker: &WorkerState<S>,
845        certificate: TimeoutCertificate,
846    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
847        worker.handle_timeout_certificate(certificate).await
848    }
849}
850
851impl<StorageClient> WorkerState<StorageClient>
852where
853    StorageClient: Storage + Clone + 'static,
854{
855    /// Creates a new `WorkerState`.
856    ///
857    /// The `chain_worker_config` must be fully configured before calling this, because the
858    /// TTL sweep task is started immediately based on the config's TTL values.
859    #[instrument(level = "trace", skip(storage, chain_worker_config))]
860    pub fn new(
861        storage: StorageClient,
862        chain_worker_config: ChainWorkerConfig,
863        chain_modes: Option<Arc<RwLock<ChainModes>>>,
864    ) -> Self {
865        let chain_workers = Arc::new(papaya::HashMap::new());
866        start_sweep(&chain_workers, &chain_worker_config);
867        let block_cache_size = chain_worker_config.block_cache_size;
868        let execution_state_cache_size = chain_worker_config.execution_state_cache_size;
869        WorkerState {
870            storage,
871            chain_worker_config,
872            block_cache: Arc::new(ValueCache::new(
873                "worker_block",
874                block_cache_size,
875                DEFAULT_CLEANUP_INTERVAL_SECS,
876            )),
877            execution_state_cache: (execution_state_cache_size > 0)
878                .then(|| Arc::new(UniqueValueCache::new(execution_state_cache_size))),
879            chain_modes,
880            delivery_notifiers: Arc::default(),
881            chain_workers,
882            // On wasm, `ChainBatchRequestProcessor` is not `Send`/`Sync` (the inner future
883            // isn't `Send`), but `Arc` is still correct: `WorkerState` clones
884            // share this map and wasm is single-threaded.
885            #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
886            chain_batches: Arc::new(papaya::HashMap::new()),
887            outbound_cross_chain_sender: None,
888        }
889    }
890
891    /// Installs a shard-routing dispatcher used for outbound cross-chain requests
892    /// generated outside the normal response path (specifically, after resetting a
893    /// corrupted chain). Without it, such requests are dispatched locally in a
894    /// loop via `handle_cross_chain_request`.
895    pub fn with_outbound_cross_chain_sender(mut self, sender: OutboundCrossChainSender) -> Self {
896        self.outbound_cross_chain_sender = Some(sender);
897        self
898    }
899
900    #[instrument(level = "trace", skip(self, certificate, notifier))]
901    #[inline]
902    pub async fn fully_handle_certificate_with_notifications<T>(
903        &self,
904        certificate: GenericCertificate<T>,
905        notifier: &impl Notifier,
906    ) -> Result<ChainInfoResponse, WorkerError>
907    where
908        T: ProcessableCertificate,
909    {
910        let notifications = (*notifier).clone();
911        let this = self.clone();
912        linera_base::Task::spawn(async move {
913            let (response, actions) =
914                ProcessableCertificate::process_certificate(&this, certificate).await?;
915            notifications.notify(&actions.notifications);
916            let mut requests = VecDeque::from(actions.cross_chain_requests);
917            while let Some(request) = requests.pop_front() {
918                let actions = this.handle_cross_chain_request(request).await?;
919                requests.extend(actions.cross_chain_requests);
920                notifications.notify(&actions.notifications);
921            }
922            Ok(response)
923        })
924        .await
925    }
926
927    /// Same as [`Self::fully_handle_certificate_with_notifications`] but for a
928    /// confirmed block certificate and with an explicit [`ProcessConfirmedBlockMode`].
929    /// The generic variant always uses [`ProcessConfirmedBlockMode::Auto`].
930    #[instrument(level = "trace", skip(self, certificate, notifier))]
931    #[inline]
932    pub async fn fully_handle_confirmed_certificate_with_notifications(
933        &self,
934        certificate: ConfirmedBlockCertificate,
935        mode: ProcessConfirmedBlockMode,
936        notifier: &impl Notifier,
937    ) -> Result<ChainInfoResponse, WorkerError> {
938        let notifications = (*notifier).clone();
939        let this = self.clone();
940        linera_base::Task::spawn(async move {
941            let (response, actions) =
942                Box::pin(this.handle_confirmed_certificate(certificate, mode, None)).await?;
943            notifications.notify(&actions.notifications);
944            let mut requests = VecDeque::from(actions.cross_chain_requests);
945            while let Some(request) = requests.pop_front() {
946                let actions = this.handle_cross_chain_request(request).await?;
947                requests.extend(actions.cross_chain_requests);
948                notifications.notify(&actions.notifications);
949            }
950            Ok(response)
951        })
952        .await
953    }
954
955    /// Acquires a read lock on the chain worker and executes the given closure.
956    ///
957    /// The future is boxed to keep deeply nested types off the stack. On non-web
958    /// targets it is also wrapped in `SyncFuture` to satisfy `Sync` bounds.
959    async fn chain_read<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
960    where
961        F: FnOnce(OwnedRwLockReadGuard<ChainWorkerState<StorageClient>>) -> Fut,
962        Fut: std::future::Future<Output = Result<R, WorkerError>>,
963    {
964        let state = self.get_or_create_chain_worker(chain_id).await?;
965        let state_ref = &state;
966        let result = Box::pin(wrap_future(async move {
967            let guard = handle::read_lock(state_ref).await?;
968            f(guard).await
969        }))
970        .await;
971        if let Err(error) = &result {
972            if error.must_reload_view() {
973                self.evict_poisoned_worker(chain_id, &state);
974            }
975        }
976        result
977    }
978
979    /// Acquires a write lock on the chain worker and executes the given closure.
980    ///
981    /// The write work runs on a detached task (via [`linera_base::task::run_detached`])
982    /// so that caller cancellation does not unwind the task mid-save. The
983    /// [`RollbackGuard`] lives inside the detached task, so the write lock is held
984    /// until the DB round-trip and `post_save` have fully completed — subsequent
985    /// readers, including a freshly-loaded replacement worker, only see the
986    /// committed state.
987    async fn chain_write<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
988    where
989        F: FnOnce(handle::RollbackGuard<StorageClient>) -> Fut
990            + linera_base::task::MaybeSend
991            + 'static,
992        Fut: std::future::Future<Output = Result<R, WorkerError>> + linera_base::task::MaybeSend,
993        R: linera_base::task::MaybeSend + 'static,
994    {
995        let state = self.get_or_create_chain_worker(chain_id).await?;
996        let state_for_task = state.clone();
997        let result = Box::pin(wrap_future(linera_base::task::run_detached(async move {
998            let guard = handle::write_lock(&state_for_task).await?;
999            f(guard).await
1000        })))
1001        .await;
1002        if let Err(error) = &result {
1003            if error.must_reload_view() {
1004                self.evict_poisoned_worker(chain_id, &state);
1005            } else if error.indicates_corrupted_chain_state() {
1006                self.spawn_reset_corrupted_chain_state(chain_id, state);
1007            }
1008        }
1009        result
1010    }
1011
1012    /// Spawns a detached task that re-acquires the write lock and recovers the
1013    /// chain from a detected state corruption. Running the recovery in a separate
1014    /// task ensures it survives cancellation of the originating request: if the
1015    /// caller's future is dropped mid-way through re-execution, the chain would
1016    /// otherwise be left at a partial tip and our safety snapshot would be lost.
1017    /// Generated `RevertConfirm` requests are dispatched via the installed
1018    /// shard-routing sender when present (sharded validators), or locally
1019    /// through `handle_cross_chain_request` otherwise (client nodes and tests).
1020    /// Errors are logged; the caller already has the original error to
1021    /// propagate.
1022    fn spawn_reset_corrupted_chain_state(
1023        &self,
1024        chain_id: ChainId,
1025        state: ChainWorkerArc<StorageClient>,
1026    ) where
1027        StorageClient: Clone,
1028    {
1029        let this = self.clone();
1030        linera_base::Task::spawn(async move {
1031            let requests = {
1032                let mut guard = match handle::write_lock(&state).await {
1033                    Ok(guard) => guard,
1034                    Err(error) => {
1035                        tracing::error!(
1036                            %chain_id, %error,
1037                            "Failed to acquire write lock to reset corrupted chain state"
1038                        );
1039                        return;
1040                    }
1041                };
1042                match guard.maybe_reset_corrupted_chain_state().await {
1043                    Ok(Some(requests)) => requests,
1044                    Ok(None) => return,
1045                    Err(error) => {
1046                        tracing::error!(
1047                            %chain_id, %error, "Failed to reset corrupted chain state"
1048                        );
1049                        return;
1050                    }
1051                }
1052            };
1053            if let Some(sender) = &this.outbound_cross_chain_sender {
1054                // Sharded validator path: let the RPC layer route each request to
1055                // the shard that owns the target chain.
1056                for request in requests {
1057                    sender(request);
1058                }
1059            } else {
1060                // No routing dispatcher is installed (client node or test), so all
1061                // involved chains are co-located on this worker. Dispatch locally
1062                // in a loop, following any cascading cross-chain requests.
1063                let mut queue = VecDeque::from(requests);
1064                while let Some(request) = queue.pop_front() {
1065                    match this.handle_cross_chain_request(request).await {
1066                        Ok(actions) => queue.extend(actions.cross_chain_requests),
1067                        Err(error) => {
1068                            warn!(
1069                                %chain_id, %error,
1070                                "Failed to dispatch cross-chain request after \
1071                                resetting corrupted chain state"
1072                            );
1073                        }
1074                    }
1075                }
1076            }
1077        })
1078        .forget();
1079    }
1080
1081    /// Evicts a poisoned chain worker from the cache, but only if the entry still
1082    /// points to the same instance. This avoids removing a fresh replacement that
1083    /// another task may have already loaded.
1084    fn evict_poisoned_worker(&self, chain_id: ChainId, poisoned: &ChainWorkerArc<StorageClient>) {
1085        tracing::warn!(%chain_id, "Evicting poisoned chain worker from cache");
1086        let pin = self.chain_workers.pin();
1087        let weak_poisoned = Arc::downgrade(poisoned);
1088        let removed = pin.remove_if(&chain_id, |_key, future| {
1089            future
1090                .peek()
1091                .and_then(|r| r.clone().ok())
1092                .is_some_and(|weak| weak.ptr_eq(&weak_poisoned))
1093        });
1094        if removed.is_err() {
1095            tracing::trace!(%chain_id, "Poisoned worker entry already replaced; skipping eviction");
1096        }
1097    }
1098
1099    /// Returns or creates the per-chain batch state.
1100    async fn get_or_create_chain_batch(
1101        &self,
1102        chain_id: ChainId,
1103    ) -> Result<(mpsc::UnboundedSender<BatchRequest>, Shared<BatchFuture>), WorkerError> {
1104        // Fast path: reuse an existing live driver. The pin guard is !Send,
1105        // so it must be dropped before any .await point.
1106        if let Some(batch_processor) = self.chain_batches.pin().get(&chain_id) {
1107            if let Some(future) = batch_processor.future.upgrade() {
1108                return Ok((batch_processor.sender.clone(), future));
1109            }
1110        }
1111        let state = self.get_or_create_chain_worker(chain_id).await?;
1112        let (new_request_processor, new_future) = ChainBatchRequestProcessor::create(
1113            state,
1114            self.chain_worker_config.cross_chain_batch_size_limit,
1115        );
1116        match self
1117            .chain_batches
1118            .pin()
1119            .compute(chain_id, |existing| match existing {
1120                Some((_, batch_processor)) => {
1121                    if let Some(future) = batch_processor.future.upgrade() {
1122                        papaya::Operation::Abort((batch_processor.sender.clone(), future))
1123                    } else {
1124                        papaya::Operation::Insert(new_request_processor.clone())
1125                    }
1126                }
1127                None => papaya::Operation::Insert(new_request_processor.clone()),
1128            }) {
1129            papaya::Compute::Aborted((sender, future)) => Ok((sender, future)),
1130            papaya::Compute::Inserted(_, batch_processor)
1131            | papaya::Compute::Updated {
1132                new: (_, batch_processor),
1133                ..
1134            } => Ok((batch_processor.sender.clone(), new_future)),
1135            papaya::Compute::Removed { .. } => unreachable!(),
1136        }
1137    }
1138
1139    /// Gets or creates a chain worker for the given chain.
1140    ///
1141    /// The oneshot channel is created outside the `compute` closure to keep
1142    /// the closure pure (papaya may call it more than once on CAS retry and
1143    /// may memoize the output). If the fast path hits, the unused channel is
1144    /// dropped harmlessly.
1145    ///
1146    /// Returns a type-erased future to keep `!Sync` intermediate types (e.g.
1147    /// `std::sync::mpsc::Receiver` from `handle::ServiceRuntimeActor::spawn`) out of
1148    /// the caller's future type.
1149    fn get_or_create_chain_worker(
1150        &self,
1151        chain_id: ChainId,
1152    ) -> std::pin::Pin<
1153        Box<
1154            impl std::future::Future<Output = Result<ChainWorkerArc<StorageClient>, WorkerError>> + '_,
1155        >,
1156    > {
1157        Box::pin(wrap_future(async move {
1158            loop {
1159                // Create the channel outside the closure so that the
1160                // sender/receiver always match regardless of CAS retries.
1161                let (sender, receiver) = oneshot::channel();
1162                let shared_receiver = receiver.shared();
1163
1164                // The papaya guard is !Send, so it must be dropped before
1165                // any .await point.
1166                let wait_or_sender = {
1167                    let pin = self.chain_workers.pin();
1168                    match pin.compute(chain_id, |existing| match existing {
1169                        Some((_, entry)) => match entry.peek() {
1170                            Some(Ok(weak)) => match weak.upgrade() {
1171                                Some(arc) => papaya::Operation::Abort(Ok(arc)),
1172                                None => papaya::Operation::Insert(shared_receiver.clone()),
1173                            },
1174                            Some(Err(_)) => papaya::Operation::Insert(shared_receiver.clone()),
1175                            None => papaya::Operation::Abort(Err(entry.clone())),
1176                        },
1177                        None => papaya::Operation::Insert(shared_receiver.clone()),
1178                    }) {
1179                        papaya::Compute::Aborted(Ok(arc), ..) => return Ok(arc),
1180                        papaya::Compute::Aborted(Err(wait), ..) => Either::Left(wait),
1181                        papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => {
1182                            Either::Right(sender)
1183                        }
1184                        papaya::Compute::Removed { .. } => unreachable!(),
1185                    }
1186                };
1187
1188                match wait_or_sender {
1189                    Either::Left(wait) => {
1190                        // Another task is loading. Await the shared future.
1191                        if let Ok(weak) = wait.await {
1192                            if let Some(arc) = weak.upgrade() {
1193                                return Ok(arc);
1194                            }
1195                        }
1196                        // Loading failed or worker already dead; retry.
1197                    }
1198                    Either::Right(sender) => {
1199                        // We claimed the loading slot. Load from storage.
1200                        // On success, send the Weak through the channel.
1201                        // On error, dropping sender wakes waiters so they can retry.
1202                        let worker = self.load_chain_worker(chain_id).await?;
1203                        if sender.send(Arc::downgrade(&worker)).is_err() {
1204                            tracing::error!(%chain_id, "Receiver dropped while loading worker state.");
1205                            continue;
1206                        }
1207                        return Ok(worker);
1208                    }
1209                }
1210            }
1211        }))
1212    }
1213
1214    /// Loads a chain worker state from storage and wraps it in an Arc.
1215    async fn load_chain_worker(
1216        &self,
1217        chain_id: ChainId,
1218    ) -> Result<ChainWorkerArc<StorageClient>, WorkerError> {
1219        let delivery_notifier = self
1220            .delivery_notifiers
1221            .lock()
1222            .unwrap()
1223            .entry(chain_id)
1224            .or_default()
1225            .clone();
1226
1227        // `chain_modes=None` means "no tracked/sender distinction" (validators) — treat
1228        // every chain as tracked so it routes through `config.ttl`, not the unset
1229        // `config.sender_chain_ttl` which would skip `spawn_keep_alive` entirely.
1230        let is_tracked = self.chain_modes.as_ref().is_none_or(|chain_modes| {
1231            chain_modes
1232                .read()
1233                .unwrap()
1234                .get(&chain_id)
1235                .is_some_and(ListeningMode::is_full)
1236        });
1237
1238        let (service_runtime_endpoint, service_runtime_task) =
1239            if self.chain_worker_config.long_lived_services {
1240                let actor =
1241                    handle::ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await;
1242                (Some(actor.endpoint), Some(actor.task))
1243            } else {
1244                (None, None)
1245            };
1246
1247        let state = crate::chain_worker::state::ChainWorkerState::load(
1248            self.chain_worker_config.clone(),
1249            self.storage.clone(),
1250            self.block_cache.clone(),
1251            self.execution_state_cache.clone(),
1252            self.chain_modes.clone(),
1253            delivery_notifier,
1254            chain_id,
1255            service_runtime_endpoint,
1256            service_runtime_task,
1257        )
1258        .await?;
1259
1260        Ok(handle::create_chain_worker(
1261            state,
1262            is_tracked,
1263            &self.chain_worker_config,
1264        ))
1265    }
1266
1267    /// Tries to execute a block proposal with a policy for handling bundle failures.
1268    ///
1269    /// Returns the modified block (bundles may be rejected/removed), the executed block,
1270    /// chain info response, and resource tracker.
1271    #[instrument(level = "trace", skip(self, block))]
1272    pub async fn stage_block_execution(
1273        &self,
1274        block: ProposedBlock,
1275        round: Option<u32>,
1276        published_blobs: Vec<Blob>,
1277        policy: BundleExecutionPolicy,
1278    ) -> Result<
1279        (
1280            ProposedBlock,
1281            Block,
1282            ChainInfoResponse,
1283            ResourceTracker,
1284            HashSet<ChainId>,
1285        ),
1286        WorkerError,
1287    > {
1288        let chain_id = block.chain_id;
1289        self.chain_write(chain_id, move |mut guard| async move {
1290            guard
1291                .stage_block_execution(block, round, &published_blobs, policy)
1292                .await
1293        })
1294        .await
1295    }
1296
1297    /// Executes a [`Query`] for an application's state on a specific chain.
1298    ///
1299    /// If `block_hash` is specified, system will query the application's state
1300    /// at that block. If it doesn't exist, it uses latest state.
1301    #[instrument(level = "trace", skip(self, chain_id, query))]
1302    pub async fn query_application(
1303        &self,
1304        chain_id: ChainId,
1305        query: Query,
1306        block_hash: Option<CryptoHash>,
1307    ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
1308        self.chain_write(chain_id, move |mut guard| async move {
1309            guard.query_application(query, block_hash).await
1310        })
1311        .await
1312    }
1313
1314    #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
1315        nickname = %self.nickname(),
1316        chain_id = %chain_id,
1317        application_id = %application_id
1318    ))]
1319    pub async fn describe_application(
1320        &self,
1321        chain_id: ChainId,
1322        application_id: ApplicationId,
1323    ) -> Result<ApplicationDescription, WorkerError> {
1324        let state = self.get_or_create_chain_worker(chain_id).await?;
1325        let guard = handle::read_lock_initialized(&state).await?;
1326        guard.describe_application_readonly(application_id).await
1327    }
1328
1329    /// Processes a confirmed block (aka a commit).
1330    #[instrument(
1331        level = "trace",
1332        skip(self, certificate, notify_when_messages_are_delivered),
1333        fields(
1334            nickname = %self.nickname(),
1335            chain_id = %certificate.block().header.chain_id,
1336            block_height = %certificate.block().header.height
1337        )
1338    )]
1339    async fn process_confirmed_block(
1340        &self,
1341        certificate: ConfirmedBlockCertificate,
1342        mode: ProcessConfirmedBlockMode,
1343        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1344    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1345        let chain_id = certificate.block().header.chain_id;
1346        self.chain_write(chain_id, move |mut guard| async move {
1347            guard
1348                .process_confirmed_block(certificate, mode, notify_when_messages_are_delivered)
1349                .await
1350        })
1351        .await
1352    }
1353
1354    /// Processes a validated block issued from a multi-owner chain.
1355    #[instrument(level = "trace", skip(self, certificate), fields(
1356        nickname = %self.nickname(),
1357        chain_id = %certificate.block().header.chain_id,
1358        block_height = %certificate.block().header.height
1359    ))]
1360    async fn process_validated_block(
1361        &self,
1362        certificate: ValidatedBlockCertificate,
1363    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1364        let chain_id = certificate.block().header.chain_id;
1365        self.chain_write(chain_id, move |mut guard| async move {
1366            guard.process_validated_block(certificate).await
1367        })
1368        .await
1369    }
1370
1371    /// Processes a leader timeout issued from a multi-owner chain.
1372    #[instrument(level = "trace", skip(self, certificate), fields(
1373        nickname = %self.nickname(),
1374        chain_id = %certificate.value().chain_id(),
1375        height = %certificate.value().height()
1376    ))]
1377    async fn process_timeout(
1378        &self,
1379        certificate: TimeoutCertificate,
1380    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1381        let chain_id = certificate.value().chain_id();
1382        self.chain_write(chain_id, move |mut guard| async move {
1383            guard.process_timeout(certificate).await
1384        })
1385        .await
1386    }
1387
1388    /// Enqueues a cross-chain update request and cooperatively drives the
1389    /// batch-processing future until the result is ready.
1390    #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
1391        nickname = %self.nickname(),
1392        origin = %origin,
1393        recipient = %recipient,
1394        num_bundles = %bundles.len()
1395    ))]
1396    async fn process_cross_chain_update(
1397        &self,
1398        origin: ChainId,
1399        recipient: ChainId,
1400        bundles: Vec<(Epoch, MessageBundle)>,
1401        previous_height: Option<BlockHeight>,
1402    ) -> Result<CrossChainUpdateResult, WorkerError> {
1403        let (result_sender, receiver) = oneshot::channel();
1404        let request = BatchRequest::Update {
1405            origin,
1406            bundles,
1407            previous_height,
1408            result_sender,
1409        };
1410        self.enqueue_and_drive(recipient, request, receiver).await
1411    }
1412
1413    /// Enqueues a confirmation request and cooperatively drives the
1414    /// batch-processing future until the result is ready.
1415    async fn confirm_updated_recipient(
1416        &self,
1417        sender: ChainId,
1418        recipient: ChainId,
1419        latest_height: BlockHeight,
1420    ) -> Result<NetworkActions, WorkerError> {
1421        let (result_sender, receiver) = oneshot::channel();
1422        let request = BatchRequest::Confirm {
1423            recipient,
1424            latest_height,
1425            result_sender,
1426        };
1427        self.enqueue_and_drive(sender, request, receiver).await
1428    }
1429
1430    /// Sends a [`BatchRequest`] to the per-chain driver and cooperatively
1431    /// polls the shared processing future until our result arrives.
1432    async fn enqueue_and_drive<R>(
1433        &self,
1434        chain_id: ChainId,
1435        request: BatchRequest,
1436        mut receiver: oneshot::Receiver<Result<R, WorkerError>>,
1437    ) -> Result<R, WorkerError> {
1438        let mut pending = Some(request);
1439        loop {
1440            let (sender, future) = self.get_or_create_chain_batch(chain_id).await?;
1441            if let Some(request) = pending.take() {
1442                if let Err(mpsc::error::SendError(request)) = sender.send(request) {
1443                    pending = Some(request);
1444                    continue; // Driver died; retry will create a new one.
1445                }
1446            }
1447            // Poll the receiver first (biased): if the result is already available,
1448            // return it immediately without driving the batch future further.
1449            match future::select(pin::pin!(&mut receiver), future).await {
1450                Either::Left((result, _)) => {
1451                    return result.expect("batch result sender dropped");
1452                }
1453                Either::Right(((), _)) => match receiver.try_recv() {
1454                    Ok(result) => return result,
1455                    Err(oneshot::error::TryRecvError::Empty) => {}
1456                    Err(oneshot::error::TryRecvError::Closed) => {
1457                        return Err(WorkerError::ChainError(Box::new(
1458                            ChainError::InternalError("batch driver stopped".into()),
1459                        )));
1460                    }
1461                },
1462            }
1463        }
1464    }
1465
1466    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
1467    #[instrument(level = "trace", skip(self, chain_id, height), fields(
1468        nickname = %self.nickname(),
1469        chain_id = %chain_id,
1470        height = %height
1471    ))]
1472    #[cfg(with_testing)]
1473    pub async fn read_certificate(
1474        &self,
1475        chain_id: ChainId,
1476        height: BlockHeight,
1477    ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, WorkerError> {
1478        let state = self.get_or_create_chain_worker(chain_id).await?;
1479        let guard = handle::read_lock_initialized(&state).await?;
1480        guard.read_certificate(height).await
1481    }
1482
1483    /// Test helper that runs `ChainWorkerState::select_message_bundles` for the given
1484    /// recipient chain.
1485    #[cfg(with_testing)]
1486    pub async fn select_message_bundles(
1487        &self,
1488        recipient: ChainId,
1489        origin: &ChainId,
1490        next_height_to_receive: BlockHeight,
1491        last_anticipated_block_height: Option<BlockHeight>,
1492        bundles: Vec<(Epoch, MessageBundle)>,
1493    ) -> Result<Vec<MessageBundle>, WorkerError> {
1494        let state = self.get_or_create_chain_worker(recipient).await?;
1495        let guard = handle::read_lock(&state).await?;
1496        guard
1497            .select_message_bundles(
1498                origin,
1499                next_height_to_receive,
1500                last_anticipated_block_height,
1501                bundles,
1502            )
1503            .await
1504    }
1505
1506    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
1507    /// [`ChainId`].
1508    ///
1509    /// The returned guard holds a read lock on the chain state, preventing writes for
1510    /// its lifetime. Multiple concurrent readers are allowed.
1511    #[instrument(level = "trace", skip(self), fields(
1512        nickname = %self.nickname(),
1513        chain_id = %chain_id
1514    ))]
1515    pub async fn chain_state_view(
1516        &self,
1517        chain_id: ChainId,
1518    ) -> Result<ChainStateViewReadGuard<StorageClient>, WorkerError> {
1519        let state = self.get_or_create_chain_worker(chain_id).await?;
1520        let guard = handle::read_lock(&state).await?;
1521        Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map(
1522            guard,
1523            |s| s.chain(),
1524        )))
1525    }
1526
1527    #[instrument(skip_all, fields(
1528        nick = self.nickname(),
1529        chain_id = format!("{:.8}", proposal.content.block.chain_id),
1530        height = %proposal.content.block.height,
1531    ))]
1532    pub async fn handle_block_proposal(
1533        &self,
1534        proposal: BlockProposal,
1535    ) -> (Result<ChainInfoResponse, WorkerError>, NetworkActions) {
1536        trace!("{} <-- {:?}", self.nickname(), proposal);
1537        #[cfg(with_metrics)]
1538        let round = proposal.content.round;
1539
1540        let chain_id = proposal.content.block.chain_id;
1541        // Delay if block timestamp is in the future but within grace period.
1542        let now = self.storage.clock().current_time();
1543        let block_timestamp = proposal.content.block.timestamp;
1544        let delta = block_timestamp.delta_since(now);
1545        let grace_period = TimeDelta::from_micros(
1546            u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros())
1547                .unwrap_or(u64::MAX),
1548        );
1549        if delta > TimeDelta::ZERO && delta <= grace_period {
1550            self.storage.clock().sleep_until(block_timestamp).await;
1551        }
1552
1553        let outcome = self
1554            .chain_write(chain_id, move |mut guard| async move {
1555                Ok::<_, WorkerError>(guard.handle_block_proposal(proposal).await)
1556            })
1557            .await;
1558        let (result, actions) = match outcome {
1559            Ok((result, actions)) => (result, actions),
1560            Err(err) => (Err(err), NetworkActions::default()),
1561        };
1562        #[cfg(with_metrics)]
1563        if result.is_ok() {
1564            metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1565                .with_label_values(&[round.type_name()])
1566                .observe(round.number() as f64);
1567        }
1568        (result, actions)
1569    }
1570
1571    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
1572    // Other fields will be included in the caller's span.
1573    #[instrument(skip_all, fields(
1574        chain_id = %certificate.value.chain_id,
1575        hash = %certificate.value.value_hash,
1576    ))]
1577    pub async fn handle_lite_certificate(
1578        &self,
1579        certificate: LiteCertificate<'_>,
1580        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1581    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1582        match self.full_certificate(certificate).await? {
1583            Either::Left(confirmed) => {
1584                Box::pin(self.handle_confirmed_certificate(
1585                    confirmed,
1586                    ProcessConfirmedBlockMode::Auto,
1587                    notify_when_messages_are_delivered,
1588                ))
1589                .await
1590            }
1591            Either::Right(validated) => {
1592                if let Some(notifier) = notify_when_messages_are_delivered {
1593                    // Nothing to wait for.
1594                    if let Err(()) = notifier.send(()) {
1595                        debug!("Failed to notify message delivery to caller (validation cert)");
1596                    }
1597                }
1598                Box::pin(self.handle_validated_certificate(validated)).await
1599            }
1600        }
1601    }
1602
1603    /// Processes a confirmed block certificate.
1604    #[instrument(skip_all, fields(
1605        nick = self.nickname(),
1606        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1607        height = %certificate.block().header.height,
1608    ))]
1609    pub async fn handle_confirmed_certificate(
1610        &self,
1611        certificate: ConfirmedBlockCertificate,
1612        mode: ProcessConfirmedBlockMode,
1613        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1614    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1615        trace!("{} <-- {:?}", self.nickname(), certificate);
1616        #[cfg(with_metrics)]
1617        let metrics_data = metrics::MetricsData::new(&certificate);
1618
1619        #[allow(unused_variables)]
1620        let (info, actions, outcome) = Box::pin(self.process_confirmed_block(
1621            certificate,
1622            mode,
1623            notify_when_messages_are_delivered,
1624        ))
1625        .await?;
1626
1627        #[cfg(with_metrics)]
1628        if matches!(outcome, BlockOutcome::Processed) {
1629            metrics_data.record();
1630        }
1631        Ok((info, actions))
1632    }
1633
1634    /// Processes a validated block certificate.
1635    #[instrument(skip_all, fields(
1636        nick = self.nickname(),
1637        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1638        height = %certificate.block().header.height,
1639    ))]
1640    pub async fn handle_validated_certificate(
1641        &self,
1642        certificate: ValidatedBlockCertificate,
1643    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1644        trace!("{} <-- {:?}", self.nickname(), certificate);
1645
1646        #[cfg(with_metrics)]
1647        let round = certificate.round;
1648        #[cfg(with_metrics)]
1649        let cert_str = certificate.inner().to_log_str();
1650
1651        #[allow(unused_variables)]
1652        let (info, actions, outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1653        #[cfg(with_metrics)]
1654        {
1655            if matches!(outcome, BlockOutcome::Processed) {
1656                metrics::NUM_ROUNDS_IN_CERTIFICATE
1657                    .with_label_values(&[cert_str, round.type_name()])
1658                    .observe(round.number() as f64);
1659            }
1660        }
1661        Ok((info, actions))
1662    }
1663
1664    /// Processes a timeout certificate
1665    #[instrument(skip_all, fields(
1666        nick = self.nickname(),
1667        chain_id = format!("{:.8}", certificate.inner().chain_id()),
1668        height = %certificate.inner().height(),
1669    ))]
1670    pub async fn handle_timeout_certificate(
1671        &self,
1672        certificate: TimeoutCertificate,
1673    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1674        trace!("{} <-- {:?}", self.nickname(), certificate);
1675        self.process_timeout(certificate).await
1676    }
1677
1678    #[instrument(skip_all, fields(
1679        nick = self.nickname(),
1680        chain_id = format!("{:.8}", query.chain_id)
1681    ))]
1682    pub async fn handle_chain_info_query(
1683        &self,
1684        query: ChainInfoQuery,
1685    ) -> Result<ChainInfoResponse, WorkerError> {
1686        trace!("{} <-- {:?}", self.nickname(), query);
1687        #[cfg(with_metrics)]
1688        metrics::CHAIN_INFO_QUERIES.inc();
1689        let chain_id = query.chain_id;
1690        let result = self
1691            .chain_write(chain_id, move |mut guard| async move {
1692                guard.handle_chain_info_query(query).await
1693            })
1694            .await;
1695        trace!("{} --> {:?}", self.nickname(), result);
1696        result
1697    }
1698
1699    #[instrument(skip_all, fields(
1700        nick = self.nickname(),
1701        chain_id = format!("{:.8}", chain_id)
1702    ))]
1703    pub async fn download_pending_blob(
1704        &self,
1705        chain_id: ChainId,
1706        blob_id: BlobId,
1707    ) -> Result<CacheArc<Blob>, WorkerError> {
1708        trace!("{} <-- download_pending_blob({blob_id:8})", self.nickname());
1709        let result = self
1710            .chain_read(chain_id, |guard| async move {
1711                guard.download_pending_blob(blob_id).await
1712            })
1713            .await;
1714        trace!(
1715            "{} --> {:?}",
1716            self.nickname(),
1717            result.as_ref().map(|_| blob_id)
1718        );
1719        result
1720    }
1721
1722    #[instrument(skip_all, fields(
1723        nick = self.nickname(),
1724        chain_id = format!("{:.8}", chain_id)
1725    ))]
1726    pub async fn handle_pending_blob(
1727        &self,
1728        chain_id: ChainId,
1729        blob: Blob,
1730    ) -> Result<ChainInfoResponse, WorkerError> {
1731        let blob_id = blob.id();
1732        trace!("{} <-- handle_pending_blob({blob_id:8})", self.nickname());
1733        let result = self
1734            .chain_write(chain_id, move |mut guard| async move {
1735                guard.handle_pending_blob(blob).await
1736            })
1737            .await;
1738        trace!(
1739            "{} --> {:?}",
1740            self.nickname(),
1741            result.as_ref().map(|_| blob_id)
1742        );
1743        result
1744    }
1745
1746    #[instrument(skip_all, fields(
1747        nick = self.nickname(),
1748        chain_id = format!("{:.8}", request.target_chain_id())
1749    ))]
1750    pub async fn handle_cross_chain_request(
1751        &self,
1752        request: CrossChainRequest,
1753    ) -> Result<NetworkActions, WorkerError> {
1754        trace!("{} <-- {:?}", self.nickname(), request);
1755        match request {
1756            CrossChainRequest::UpdateRecipient {
1757                sender,
1758                recipient,
1759                bundles,
1760                previous_height,
1761            } => {
1762                let mut actions = NetworkActions::default();
1763                let origin = sender;
1764                match self
1765                    .process_cross_chain_update(origin, recipient, bundles, previous_height)
1766                    .await?
1767                {
1768                    CrossChainUpdateResult::NothingToDo => {}
1769                    CrossChainUpdateResult::Updated(height) => {
1770                        actions.notifications.push(Notification {
1771                            chain_id: recipient,
1772                            reason: Reason::NewIncomingBundle { origin, height },
1773                        });
1774                        actions.cross_chain_requests.push(
1775                            CrossChainRequest::ConfirmUpdatedRecipient {
1776                                sender,
1777                                recipient,
1778                                latest_height: height,
1779                            },
1780                        );
1781                    }
1782                    CrossChainUpdateResult::GapDetected {
1783                        origin,
1784                        retransmit_from,
1785                    } => {
1786                        actions
1787                            .cross_chain_requests
1788                            .push(CrossChainRequest::RevertConfirm {
1789                                sender: origin,
1790                                recipient,
1791                                retransmit_from,
1792                            });
1793                    }
1794                }
1795                Ok(actions)
1796            }
1797            CrossChainRequest::ConfirmUpdatedRecipient {
1798                sender,
1799                recipient,
1800                latest_height,
1801            } => {
1802                let actions = self
1803                    .confirm_updated_recipient(sender, recipient, latest_height)
1804                    .await?;
1805                Ok(actions)
1806            }
1807            CrossChainRequest::RevertConfirm {
1808                sender,
1809                recipient,
1810                retransmit_from,
1811            } => {
1812                self.chain_write(sender, move |mut guard| async move {
1813                    guard
1814                        .handle_revert_confirm(recipient, retransmit_from)
1815                        .await
1816                })
1817                .await
1818            }
1819        }
1820    }
1821
1822    /// Updates the received certificate trackers to at least the given values.
1823    #[instrument(skip_all, fields(
1824        nickname = %self.nickname(),
1825        chain_id = %chain_id,
1826        num_trackers = %new_trackers.len()
1827    ))]
1828    pub async fn update_received_certificate_trackers(
1829        &self,
1830        chain_id: ChainId,
1831        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1832    ) -> Result<(), WorkerError> {
1833        self.chain_write(chain_id, move |mut guard| async move {
1834            guard
1835                .update_received_certificate_trackers(new_trackers)
1836                .await
1837        })
1838        .await
1839    }
1840
1841    /// Gets preprocessed block hashes in a given height range.
1842    #[instrument(skip_all, fields(
1843        nickname = %self.nickname(),
1844        chain_id = %chain_id,
1845        start = %start,
1846        end = %end
1847    ))]
1848    pub async fn get_preprocessed_block_hashes(
1849        &self,
1850        chain_id: ChainId,
1851        start: BlockHeight,
1852        end: BlockHeight,
1853    ) -> Result<Vec<CryptoHash>, WorkerError> {
1854        self.chain_read(chain_id, |guard| async move {
1855            guard.get_preprocessed_block_hashes(start, end).await
1856        })
1857        .await
1858    }
1859
1860    /// Gets the next block height to receive from an inbox.
1861    #[instrument(skip_all, fields(
1862        nickname = %self.nickname(),
1863        chain_id = %chain_id,
1864        origin = %origin
1865    ))]
1866    pub async fn get_inbox_next_height(
1867        &self,
1868        chain_id: ChainId,
1869        origin: ChainId,
1870    ) -> Result<BlockHeight, WorkerError> {
1871        self.chain_read(chain_id, |guard| async move {
1872            guard.get_inbox_next_height(origin).await
1873        })
1874        .await
1875    }
1876
1877    /// Gets locking blobs for specific blob IDs.
1878    /// Returns `Ok(None)` if any of the blobs is not found.
1879    #[instrument(skip_all, fields(
1880        nickname = %self.nickname(),
1881        chain_id = %chain_id,
1882        num_blob_ids = %blob_ids.len()
1883    ))]
1884    pub async fn get_locking_blobs(
1885        &self,
1886        chain_id: ChainId,
1887        blob_ids: Vec<BlobId>,
1888    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1889        self.chain_read(chain_id, |guard| async move {
1890            guard.get_locking_blobs(blob_ids).await
1891        })
1892        .await
1893    }
1894
1895    /// Gets block hashes for the given heights.
1896    pub async fn get_block_hashes(
1897        &self,
1898        chain_id: ChainId,
1899        heights: Vec<BlockHeight>,
1900    ) -> Result<Vec<CryptoHash>, WorkerError> {
1901        self.chain_read(chain_id, |guard| async move {
1902            guard.get_block_hashes(heights).await
1903        })
1904        .await
1905    }
1906
1907    /// Gets proposed blobs from the manager for specified blob IDs.
1908    pub async fn get_proposed_blobs(
1909        &self,
1910        chain_id: ChainId,
1911        blob_ids: Vec<BlobId>,
1912    ) -> Result<Vec<Blob>, WorkerError> {
1913        self.chain_read(chain_id, |guard| async move {
1914            guard.get_proposed_blobs(blob_ids).await
1915        })
1916        .await
1917    }
1918
1919    /// Gets event subscriptions from the chain.
1920    pub async fn get_event_subscriptions(
1921        &self,
1922        chain_id: ChainId,
1923    ) -> Result<EventSubscriptionsResult, WorkerError> {
1924        self.chain_read(chain_id, |guard| async move {
1925            guard.get_event_subscriptions().await
1926        })
1927        .await
1928    }
1929
1930    /// Gets the next expected event index for a stream.
1931    pub async fn get_next_expected_event(
1932        &self,
1933        chain_id: ChainId,
1934        stream_id: StreamId,
1935    ) -> Result<Option<u32>, WorkerError> {
1936        self.chain_read(chain_id, |guard| async move {
1937            guard.get_next_expected_event(stream_id).await
1938        })
1939        .await
1940    }
1941
1942    /// Gets the `next_expected_events` indices for the given streams.
1943    pub async fn next_expected_events(
1944        &self,
1945        chain_id: ChainId,
1946        stream_ids: Vec<StreamId>,
1947    ) -> Result<BTreeMap<StreamId, u32>, WorkerError> {
1948        self.chain_read(chain_id, |guard| async move {
1949            guard.get_next_expected_events(stream_ids).await
1950        })
1951        .await
1952    }
1953
1954    /// Gets received certificate trackers.
1955    pub async fn get_received_certificate_trackers(
1956        &self,
1957        chain_id: ChainId,
1958    ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1959        self.chain_read(chain_id, |guard| async move {
1960            guard.get_received_certificate_trackers().await
1961        })
1962        .await
1963    }
1964
1965    /// Returns the pending cross-chain network actions for this chain without
1966    /// initializing its execution state. Safe to call on chains whose
1967    /// `ChainDescription` blob is not available locally.
1968    pub async fn cross_chain_network_actions(
1969        &self,
1970        chain_id: ChainId,
1971    ) -> Result<NetworkActions, WorkerError> {
1972        // Fast path: when the outbox index is already reconciled to the current tracked set,
1973        // the network actions are built from a read-only view, so a shared lock with no save
1974        // suffices — avoiding the write lock, task spawn and `save()` of the slow path.
1975        if let Some(actions) = self
1976            .chain_read(chain_id, |guard| async move {
1977                guard.cross_chain_network_actions_if_reconciled().await
1978            })
1979            .await?
1980        {
1981            return Ok(actions);
1982        }
1983        // Slow path (first load after migration, or the tracked set changed): reconcile and
1984        // persist the index under an exclusive lock before building.
1985        self.chain_write(chain_id, |mut guard| async move {
1986            guard.reconcile_and_cross_chain_network_actions().await
1987        })
1988        .await
1989    }
1990
1991    /// Gets tip state and outbox info for next_outbox_heights calculation.
1992    pub async fn get_tip_state_and_outbox_info(
1993        &self,
1994        chain_id: ChainId,
1995        receiver_id: ChainId,
1996    ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1997        self.chain_read(chain_id, |guard| async move {
1998            guard.get_tip_state_and_outbox_info(receiver_id).await
1999        })
2000        .await
2001    }
2002
2003    /// Gets the next height to preprocess.
2004    pub async fn get_next_height_to_preprocess(
2005        &self,
2006        chain_id: ChainId,
2007    ) -> Result<BlockHeight, WorkerError> {
2008        self.chain_read(chain_id, |guard| async move {
2009            Ok(guard.get_next_height_to_preprocess())
2010        })
2011        .await
2012    }
2013}
2014
2015#[cfg(with_testing)]
2016impl<StorageClient> WorkerState<StorageClient>
2017where
2018    StorageClient: Storage + Clone + 'static,
2019{
2020    /// Gets a reference to the validator's [`ValidatorPublicKey`].
2021    ///
2022    /// # Panics
2023    ///
2024    /// If the validator doesn't have a key pair assigned to it.
2025    #[instrument(level = "trace", skip(self))]
2026    pub fn public_key(&self) -> ValidatorPublicKey {
2027        self.chain_worker_config
2028            .key_pair()
2029            .expect(
2030                "Test validator should have a key pair assigned to it \
2031                in order to obtain its public key",
2032            )
2033            .public()
2034    }
2035}