linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::{
12    future::{Either, Shared},
13    FutureExt as _,
14};
15use linera_base::{
16    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
17    data_types::{
18        ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta,
19        Timestamp,
20    },
21    doc_scalar,
22    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
23};
24use linera_cache::{UniqueValueCache, ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
25#[cfg(with_testing)]
26use linera_chain::ChainExecutionContext;
27use linera_chain::{
28    data_types::{BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock},
29    types::{
30        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
31        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
32    },
33    ChainError, ChainStateView,
34};
35use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
36use linera_storage::{Clock as _, Storage};
37use linera_views::{context::InactiveContext, ViewError};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40use tokio::sync::{oneshot, OwnedRwLockReadGuard};
41use tracing::{instrument, trace, warn};
42
43/// A read guard providing access to a chain's [`ChainStateView`].
44///
45/// Holds a read lock on the chain worker state, preventing writes for its
46/// lifetime. The `OwnedRwLockReadGuard` internally holds a strong `Arc`
47/// reference to the `RwLock<ChainWorkerState>`, keeping the state alive.
48/// Dereferences to `ChainStateView`.
49pub struct ChainStateViewReadGuard<S: Storage>(
50    OwnedRwLockReadGuard<ChainWorkerState<S>, ChainStateView<S::Context>>,
51);
52
53impl<S: Storage> std::ops::Deref for ChainStateViewReadGuard<S> {
54    type Target = ChainStateView<S::Context>;
55
56    fn deref(&self) -> &Self::Target {
57        &self.0
58    }
59}
60
61/// Re-export of [`EventSubscriptionsResult`] for use by other crate modules.
62pub(crate) use crate::chain_worker::EventSubscriptionsResult;
63use crate::{
64    chain_worker::{
65        handle, state::ChainWorkerState, BlockOutcome, ChainWorkerConfig, CrossChainUpdateResult,
66        DeliveryNotifier,
67    },
68    client::ListeningMode,
69    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
70    notifier::Notifier,
71};
72
73pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000;
74pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
75
76#[cfg(test)]
77#[path = "unit_tests/worker_tests.rs"]
78mod worker_tests;
79
80/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds.
81/// On web targets the future is returned as-is.
82#[cfg(not(web))]
83pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> sync_wrapper::SyncFuture<F> {
84    sync_wrapper::SyncFuture::new(f)
85}
86
87/// Wraps a future in `SyncFuture` on non-web targets so that it satisfies `Sync` bounds.
88/// On web targets the future is returned as-is.
89#[cfg(web)]
90pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> F {
91    f
92}
93
94#[cfg(with_metrics)]
95mod metrics {
96    use std::sync::LazyLock;
97
98    use linera_base::prometheus_util::{
99        exponential_bucket_interval, register_histogram, register_histogram_vec,
100        register_int_counter, register_int_counter_vec,
101    };
102    use linera_chain::{data_types::MessageAction, types::ConfirmedBlockCertificate};
103    use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
104
105    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
106        register_histogram_vec(
107            "num_rounds_in_certificate",
108            "Number of rounds in certificate",
109            &["certificate_value", "round_type"],
110            exponential_bucket_interval(0.1, 50.0),
111        )
112    });
113
114    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
115        register_histogram_vec(
116            "num_rounds_in_block_proposal",
117            "Number of rounds in block proposal",
118            &["round_type"],
119            exponential_bucket_interval(0.1, 50.0),
120        )
121    });
122
123    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
124        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
125
126    pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
127        LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
128
129    pub static REJECTED_BUNDLE_COUNT: LazyLock<IntCounter> =
130        LazyLock::new(|| register_int_counter("rejected_bundle_count", "Rejected bundle count"));
131
132    pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
133        LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
134
135    pub static OPERATION_COUNT: LazyLock<IntCounter> =
136        LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
137
138    pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
139        register_histogram(
140            "operations_per_block",
141            "Number of operations per block",
142            exponential_bucket_interval(1.0, 10000.0),
143        )
144    });
145
146    pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
147        register_histogram(
148            "incoming_bundles_per_block",
149            "Number of incoming bundles per block",
150            exponential_bucket_interval(1.0, 10000.0),
151        )
152    });
153
154    pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
155        register_histogram(
156            "transactions_per_block",
157            "Number of transactions per block",
158            exponential_bucket_interval(1.0, 10000.0),
159        )
160    });
161
162    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
163        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
164    });
165
166    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
167        register_int_counter_vec(
168            "certificates_signed",
169            "Number of confirmed block certificates signed by each validator",
170            &["validator_name"],
171        )
172    });
173
174    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
175        register_int_counter(
176            "chain_info_queries",
177            "Number of chain info queries processed",
178        )
179    });
180
181    /// Holds metrics data extracted from a confirmed block certificate.
182    pub struct MetricsData {
183        certificate_log_str: &'static str,
184        round_type: &'static str,
185        round_number: u32,
186        confirmed_transactions: u64,
187        confirmed_incoming_bundles: u64,
188        confirmed_rejected_bundles: u64,
189        confirmed_incoming_messages: u64,
190        confirmed_operations: u64,
191        validators_with_signatures: Vec<String>,
192    }
193
194    impl MetricsData {
195        /// Creates a new `MetricsData` by extracting data from the certificate.
196        pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
197            Self {
198                certificate_log_str: certificate.inner().to_log_str(),
199                round_type: certificate.round.type_name(),
200                round_number: certificate.round.number(),
201                confirmed_transactions: certificate.block().body.transactions.len() as u64,
202                confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
203                    as u64,
204                confirmed_rejected_bundles: certificate
205                    .block()
206                    .body
207                    .incoming_bundles()
208                    .filter(|b| b.action == MessageAction::Reject)
209                    .count() as u64,
210                confirmed_incoming_messages: certificate
211                    .block()
212                    .body
213                    .incoming_bundles()
214                    .map(|b| b.messages().count())
215                    .sum::<usize>() as u64,
216                confirmed_operations: certificate.block().body.operations().count() as u64,
217                validators_with_signatures: certificate
218                    .signatures()
219                    .iter()
220                    .map(|(validator_name, _)| validator_name.to_string())
221                    .collect(),
222            }
223        }
224
225        /// Records the metrics for a processed block.
226        pub fn record(self) {
227            NUM_BLOCKS.with_label_values(&[]).inc();
228            NUM_ROUNDS_IN_CERTIFICATE
229                .with_label_values(&[self.certificate_log_str, self.round_type])
230                .observe(self.round_number as f64);
231            TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
232            INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
233            OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
234            if self.confirmed_transactions > 0 {
235                TRANSACTION_COUNT
236                    .with_label_values(&[])
237                    .inc_by(self.confirmed_transactions);
238                if self.confirmed_incoming_bundles > 0 {
239                    INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
240                }
241                if self.confirmed_rejected_bundles > 0 {
242                    REJECTED_BUNDLE_COUNT.inc_by(self.confirmed_rejected_bundles);
243                }
244                if self.confirmed_incoming_messages > 0 {
245                    INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
246                }
247                if self.confirmed_operations > 0 {
248                    OPERATION_COUNT.inc_by(self.confirmed_operations);
249                }
250            }
251
252            for validator_name in self.validators_with_signatures {
253                CERTIFICATES_SIGNED
254                    .with_label_values(&[&validator_name])
255                    .inc();
256            }
257        }
258    }
259}
260
261/// Instruct the networking layer to send cross-chain requests and/or push notifications.
262#[derive(Default, Debug)]
263pub struct NetworkActions {
264    /// The cross-chain requests
265    pub cross_chain_requests: Vec<CrossChainRequest>,
266    /// The push notifications.
267    pub notifications: Vec<Notification>,
268}
269
270impl NetworkActions {
271    pub fn extend(&mut self, other: NetworkActions) {
272        self.cross_chain_requests.extend(other.cross_chain_requests);
273        self.notifications.extend(other.notifications);
274    }
275}
276
277#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
278/// Notification that a chain has a new certified block or a new message.
279pub struct Notification {
280    pub chain_id: ChainId,
281    pub reason: Reason,
282}
283
284doc_scalar!(
285    Notification,
286    "Notify that a chain has a new certified block or a new message"
287);
288
289#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
290/// Reason for the notification.
291pub enum Reason {
292    NewBlock {
293        height: BlockHeight,
294        hash: CryptoHash,
295    },
296    NewEvents {
297        height: BlockHeight,
298        block_hash: CryptoHash,
299        event_streams: BTreeSet<StreamId>,
300    },
301    NewIncomingBundle {
302        origin: ChainId,
303        height: BlockHeight,
304    },
305    NewRound {
306        height: BlockHeight,
307        round: Round,
308    },
309    BlockExecuted {
310        height: BlockHeight,
311        hash: CryptoHash,
312    },
313}
314
315/// Error type for worker operations.
316#[derive(Debug, Error, strum::IntoStaticStr)]
317pub enum WorkerError {
318    #[error(transparent)]
319    CryptoError(#[from] CryptoError),
320
321    #[error(transparent)]
322    ArithmeticError(#[from] ArithmeticError),
323
324    #[error(transparent)]
325    ViewError(#[from] ViewError),
326
327    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
328    ReadCertificatesError(Vec<CryptoHash>),
329
330    #[error(transparent)]
331    ChainError(#[from] Box<ChainError>),
332
333    #[error(transparent)]
334    BcsError(#[from] bcs::Error),
335
336    // Chain access control
337    #[error("Block was not signed by an authorized owner")]
338    InvalidOwner,
339
340    #[error("Operations in the block are not authenticated by the proper owner: {0}")]
341    InvalidSigner(AccountOwner),
342
343    // Chaining
344    #[error(
345        "Chain is expecting a next block at height {expected_block_height} but the given block \
346        is at height {found_block_height} instead"
347    )]
348    UnexpectedBlockHeight {
349        expected_block_height: BlockHeight,
350        found_block_height: BlockHeight,
351    },
352    #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
353    InvalidEpoch {
354        chain_id: ChainId,
355        chain_epoch: Epoch,
356        epoch: Epoch,
357    },
358
359    #[error("Events not found: {0:?}")]
360    EventsNotFound(Vec<EventId>),
361
362    // Other server-side errors
363    #[error("Invalid cross-chain request")]
364    InvalidCrossChainRequest,
365    #[error("The block does not contain the hash that we expected for the previous block")]
366    InvalidBlockChaining,
367    #[error(
368        "Block timestamp ({block_timestamp}) is further in the future from local time \
369        ({local_time}) than block time grace period ({block_time_grace_period:?})"
370    )]
371    InvalidTimestamp {
372        block_timestamp: Timestamp,
373        local_time: Timestamp,
374        block_time_grace_period: Duration,
375    },
376    #[error("We don't have the value for the certificate.")]
377    MissingCertificateValue,
378    #[error("The hash certificate doesn't match its value.")]
379    InvalidLiteCertificate,
380    #[error("Fast blocks cannot query oracles")]
381    FastBlockUsingOracles,
382    #[error("Blobs not found: {0:?}")]
383    BlobsNotFound(Vec<BlobId>),
384    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
385    ConfirmedLogEntryNotFound {
386        height: BlockHeight,
387        chain_id: ChainId,
388    },
389    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
390    PreprocessedBlocksEntryNotFound {
391        height: BlockHeight,
392        chain_id: ChainId,
393    },
394    #[error(
395        "confirmed_log/preprocessed_blocks entry at height {height} for chain {chain_id} not found"
396    )]
397    ConfirmedBlockHashNotFound {
398        height: BlockHeight,
399        chain_id: ChainId,
400    },
401    #[error("Block at height {height} on chain {chain_id} not found in local storage")]
402    LocalBlockNotFound {
403        height: BlockHeight,
404        chain_id: ChainId,
405    },
406    #[error("The block proposal is invalid: {0}")]
407    InvalidBlockProposal(String),
408    #[error("Blob was not required by any pending block")]
409    UnexpectedBlob,
410    #[error("Number of published blobs per block must not exceed {0}")]
411    TooManyPublishedBlobs(u64),
412    #[error("Missing network description")]
413    MissingNetworkDescription,
414    #[error("thread error: {0}")]
415    Thread(#[from] web_thread_pool::Error),
416    #[error("Chain worker was poisoned by a journal resolution failure")]
417    PoisonedWorker,
418}
419
420impl WorkerError {
421    /// Returns whether this error is caused by an issue in the local node.
422    ///
423    /// Returns `false` whenever the error could be caused by a bad message from a peer.
424    pub fn is_local(&self) -> bool {
425        match self {
426            WorkerError::CryptoError(_)
427            | WorkerError::ArithmeticError(_)
428            | WorkerError::InvalidOwner
429            | WorkerError::InvalidSigner(_)
430            | WorkerError::UnexpectedBlockHeight { .. }
431            | WorkerError::InvalidEpoch { .. }
432            | WorkerError::EventsNotFound(_)
433            | WorkerError::InvalidBlockChaining
434            | WorkerError::InvalidTimestamp { .. }
435            | WorkerError::MissingCertificateValue
436            | WorkerError::InvalidLiteCertificate
437            | WorkerError::FastBlockUsingOracles
438            | WorkerError::BlobsNotFound(_)
439            | WorkerError::InvalidBlockProposal(_)
440            | WorkerError::UnexpectedBlob
441            | WorkerError::TooManyPublishedBlobs(_)
442            | WorkerError::ViewError(ViewError::NotFound(_)) => false,
443            WorkerError::BcsError(_)
444            | WorkerError::InvalidCrossChainRequest
445            | WorkerError::ViewError(_)
446            | WorkerError::ConfirmedLogEntryNotFound { .. }
447            | WorkerError::PreprocessedBlocksEntryNotFound { .. }
448            | WorkerError::ConfirmedBlockHashNotFound { .. }
449            | WorkerError::LocalBlockNotFound { .. }
450            | WorkerError::MissingNetworkDescription
451            | WorkerError::Thread(_)
452            | WorkerError::ReadCertificatesError(_)
453            | WorkerError::PoisonedWorker => true,
454            WorkerError::ChainError(chain_error) => chain_error.is_local(),
455        }
456    }
457
458    /// Returns the qualified error variant name for the `error_type` metric label,
459    /// e.g. `"WorkerError::UnexpectedBlockHeight"`.
460    ///
461    /// For `ChainError` variants, delegates to `ChainError::error_type()` to
462    /// surface the underlying error name rather than just `"ChainError"`.
463    pub fn error_type(&self) -> String {
464        match self {
465            WorkerError::ChainError(chain_error) => chain_error.error_type(),
466            other => {
467                let variant: &'static str = other.into();
468                format!("WorkerError::{variant}")
469            }
470        }
471    }
472
473    /// Returns `true` if this error indicates that the chain worker's in-memory
474    /// state may be inconsistent and must be evicted from the cache.
475    fn must_reload_view(&self) -> bool {
476        matches!(
477            self,
478            WorkerError::PoisonedWorker
479                | WorkerError::ViewError(ViewError::StoreError {
480                    must_reload_view: true,
481                    ..
482                })
483        )
484    }
485
486    /// Returns `true` if this error indicates that the chain's persisted state is
487    /// internally inconsistent, so the worker should consider resetting and
488    /// re-executing it from storage.
489    fn indicates_corrupted_chain_state(&self) -> bool {
490        matches!(
491            self,
492            WorkerError::ChainError(chain_error)
493                if matches!(chain_error.as_ref(), ChainError::CorruptedChainState(_))
494        )
495    }
496}
497
498impl From<ChainError> for WorkerError {
499    #[instrument(level = "trace", skip(chain_error))]
500    fn from(chain_error: ChainError) -> Self {
501        match chain_error {
502            ChainError::ExecutionError(execution_error, context) => match *execution_error {
503                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
504                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
505                _ => Self::ChainError(Box::new(ChainError::ExecutionError(
506                    execution_error,
507                    context,
508                ))),
509            },
510            error => Self::ChainError(Box::new(error)),
511        }
512    }
513}
514
515#[cfg(with_testing)]
516impl WorkerError {
517    /// Returns the inner [`ExecutionError`] in this error.
518    ///
519    /// # Panics
520    ///
521    /// If this is not caused by an [`ExecutionError`].
522    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
523        let WorkerError::ChainError(chain_error) = self else {
524            panic!("Expected an `ExecutionError`. Got: {self:#?}");
525        };
526
527        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
528            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
529        };
530
531        assert_eq!(context, expected_context);
532
533        *execution_error
534    }
535}
536
537type ChainWorkerArc<S> = Arc<tokio::sync::RwLock<ChainWorkerState<S>>>;
538type ChainWorkerWeak<S> = std::sync::Weak<tokio::sync::RwLock<ChainWorkerState<S>>>;
539type ChainWorkerFuture<S> = Shared<oneshot::Receiver<ChainWorkerWeak<S>>>;
540
541/// Each map entry is a `Shared<oneshot::Receiver<Weak<...>>>`:
542///
543/// - `peek()` returns `None` while a task is loading the worker from storage.
544/// - `peek()` returns `Some(Ok(weak))` once the worker is loaded.
545/// - `peek()` returns `Some(Err(_))` if loading failed (sender dropped).
546///
547/// Callers that find a pending entry clone the `Shared` future and await it.
548type ChainWorkerMap<S> = Arc<papaya::HashMap<ChainId, ChainWorkerFuture<S>>>;
549
550/// Starts a background task that periodically removes dead weak references
551/// from the chain handle map. The actual lifetime management is handled by
552/// each handle's keep-alive task.
553fn start_sweep<S: Storage + Clone + 'static>(
554    chain_workers: &ChainWorkerMap<S>,
555    config: &ChainWorkerConfig,
556) {
557    // Sweep at the smaller of the two TTLs. If both are None, workers
558    // live forever so there's nothing to sweep.
559    let interval = match (config.ttl, config.sender_chain_ttl) {
560        (None, None) => return,
561        (Some(d), None) | (None, Some(d)) => d,
562        (Some(a), Some(b)) => a.min(b),
563    };
564    let weak_map = Arc::downgrade(chain_workers);
565    linera_base::Task::spawn(async move {
566        loop {
567            linera_base::time::timer::sleep(interval).await;
568            let Some(map) = weak_map.upgrade() else {
569                break;
570            };
571            map.pin_owned().retain(|_, shared| match shared.peek() {
572                Some(Ok(weak)) => weak.strong_count() > 0,
573                Some(Err(_)) => false, // Loading failed; clean up.
574                None => true,          // Still loading; keep.
575            });
576        }
577    })
578    .forget();
579}
580
581/// State of a worker in a validator or a local node.
582pub struct WorkerState<StorageClient: Storage> {
583    /// Access to local persistent storage.
584    storage: StorageClient,
585    /// Configuration options for chain workers.
586    chain_worker_config: ChainWorkerConfig,
587    block_cache: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
588    execution_state_cache:
589        Option<Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>>,
590    /// Chains tracked by a worker, along with their listening modes.
591    chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
592    /// One-shot channels to notify callers when messages of a particular chain have been
593    /// delivered.
594    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
595    /// The cache of loaded chain workers. Stores weak references; each worker
596    /// manages its own lifetime via a keep-alive task. A background sweep
597    /// periodically removes dead entries.
598    chain_workers: ChainWorkerMap<StorageClient>,
599    /// Shard-routing dispatcher for outbound cross-chain requests. Used when we need
600    /// to send cross-chain requests outside of the normal `NetworkActions` return
601    /// path — in particular, the `RevertConfirm`s emitted after resetting a
602    /// corrupted chain. The RPC server layer installs this; without it, we fall
603    /// back to dispatching locally through `handle_cross_chain_request`.
604    outbound_cross_chain_sender: Option<OutboundCrossChainSender>,
605}
606
607/// Dispatcher for outbound cross-chain requests that handles the source-shard-to-
608/// target-shard routing that the worker itself is not aware of.
609pub type OutboundCrossChainSender = Arc<dyn Fn(CrossChainRequest) + Send + Sync>;
610
611impl<StorageClient> Clone for WorkerState<StorageClient>
612where
613    StorageClient: Storage + Clone,
614{
615    fn clone(&self) -> Self {
616        WorkerState {
617            storage: self.storage.clone(),
618            chain_worker_config: self.chain_worker_config.clone(),
619            block_cache: self.block_cache.clone(),
620            execution_state_cache: self.execution_state_cache.clone(),
621            chain_modes: self.chain_modes.clone(),
622            delivery_notifiers: self.delivery_notifiers.clone(),
623            chain_workers: self.chain_workers.clone(),
624            outbound_cross_chain_sender: self.outbound_cross_chain_sender.clone(),
625        }
626    }
627}
628
629pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
630
631impl<StorageClient> WorkerState<StorageClient>
632where
633    StorageClient: Storage,
634{
635    /// Returns an instance with the specified cross-chain message chunk limit.
636    #[cfg(with_testing)]
637    #[instrument(level = "trace", skip(self))]
638    pub fn with_cross_chain_message_chunk_limit(mut self, limit: usize) -> Self {
639        self.chain_worker_config.cross_chain_message_chunk_limit = limit;
640        self
641    }
642
643    /// Sets the cross-chain message chunk limit.
644    #[cfg(with_testing)]
645    pub fn set_cross_chain_message_chunk_limit(&mut self, limit: usize) {
646        self.chain_worker_config.cross_chain_message_chunk_limit = limit;
647    }
648
649    #[cfg(with_testing)]
650    #[instrument(level = "trace", skip(self, value))]
651    pub fn with_allow_revert_confirm(mut self, value: bool) -> Self {
652        self.chain_worker_config.allow_revert_confirm = value;
653        self
654    }
655
656    #[instrument(level = "trace", skip(self))]
657    pub fn nickname(&self) -> &str {
658        &self.chain_worker_config.nickname
659    }
660
661    /// Returns the storage client so that it can be manipulated or queried.
662    #[instrument(level = "trace", skip(self))]
663    #[cfg(not(feature = "test"))]
664    pub(crate) fn storage_client(&self) -> &StorageClient {
665        &self.storage
666    }
667
668    /// Returns the storage client so that it can be manipulated or queried by tests in other
669    /// crates.
670    #[instrument(level = "trace", skip(self))]
671    #[cfg(feature = "test")]
672    pub fn storage_client(&self) -> &StorageClient {
673        &self.storage
674    }
675
676    #[instrument(level = "trace", skip(self, certificate))]
677    pub(crate) async fn full_certificate(
678        &self,
679        certificate: LiteCertificate<'_>,
680    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
681        let block = self
682            .block_cache
683            .get(&certificate.value.value_hash)
684            .ok_or(WorkerError::MissingCertificateValue)?;
685        let block = Arc::unwrap_or_clone(block);
686
687        match certificate.value.kind {
688            linera_chain::types::CertificateKind::Confirmed => Ok(Either::Left(
689                certificate
690                    .with_value(block)
691                    .ok_or(WorkerError::InvalidLiteCertificate)?,
692            )),
693            linera_chain::types::CertificateKind::Validated => {
694                let value = ValidatedBlock::from_hashed(block.into_inner());
695                Ok(Either::Right(
696                    certificate
697                        .with_value(value)
698                        .ok_or(WorkerError::InvalidLiteCertificate)?,
699                ))
700            }
701            _ => Err(WorkerError::InvalidLiteCertificate),
702        }
703    }
704}
705
706#[allow(async_fn_in_trait)]
707#[cfg_attr(not(web), trait_variant::make(Send))]
708pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
709    async fn process_certificate<S: Storage + Clone + 'static>(
710        worker: &WorkerState<S>,
711        certificate: GenericCertificate<Self>,
712    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
713}
714
715impl ProcessableCertificate for ConfirmedBlock {
716    async fn process_certificate<S: Storage + Clone + 'static>(
717        worker: &WorkerState<S>,
718        certificate: ConfirmedBlockCertificate,
719    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
720        Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
721    }
722}
723
724impl ProcessableCertificate for ValidatedBlock {
725    async fn process_certificate<S: Storage + Clone + 'static>(
726        worker: &WorkerState<S>,
727        certificate: ValidatedBlockCertificate,
728    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
729        Box::pin(worker.handle_validated_certificate(certificate)).await
730    }
731}
732
733impl ProcessableCertificate for Timeout {
734    async fn process_certificate<S: Storage + Clone + 'static>(
735        worker: &WorkerState<S>,
736        certificate: TimeoutCertificate,
737    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
738        worker.handle_timeout_certificate(certificate).await
739    }
740}
741
742impl<StorageClient> WorkerState<StorageClient>
743where
744    StorageClient: Storage + Clone + 'static,
745{
746    /// Creates a new `WorkerState`.
747    ///
748    /// The `chain_worker_config` must be fully configured before calling this, because the
749    /// TTL sweep task is started immediately based on the config's TTL values.
750    #[instrument(level = "trace", skip(storage, chain_worker_config))]
751    pub fn new(
752        storage: StorageClient,
753        chain_worker_config: ChainWorkerConfig,
754        chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
755    ) -> Self {
756        let chain_workers = Arc::new(papaya::HashMap::new());
757        start_sweep(&chain_workers, &chain_worker_config);
758        let block_cache_size = chain_worker_config.block_cache_size;
759        let execution_state_cache_size = chain_worker_config.execution_state_cache_size;
760        WorkerState {
761            storage,
762            chain_worker_config,
763            block_cache: Arc::new(ValueCache::new(
764                block_cache_size,
765                DEFAULT_CLEANUP_INTERVAL_SECS,
766            )),
767            execution_state_cache: (execution_state_cache_size > 0)
768                .then(|| Arc::new(UniqueValueCache::new(execution_state_cache_size))),
769            chain_modes,
770            delivery_notifiers: Arc::default(),
771            chain_workers,
772            outbound_cross_chain_sender: None,
773        }
774    }
775
776    /// Installs a shard-routing dispatcher used for outbound cross-chain requests
777    /// generated outside the normal response path (specifically, after resetting a
778    /// corrupted chain). Without it, such requests are dispatched locally in a
779    /// loop via `handle_cross_chain_request`.
780    pub fn with_outbound_cross_chain_sender(mut self, sender: OutboundCrossChainSender) -> Self {
781        self.outbound_cross_chain_sender = Some(sender);
782        self
783    }
784
785    #[instrument(level = "trace", skip(self, certificate, notifier))]
786    #[inline]
787    pub async fn fully_handle_certificate_with_notifications<T>(
788        &self,
789        certificate: GenericCertificate<T>,
790        notifier: &impl Notifier,
791    ) -> Result<ChainInfoResponse, WorkerError>
792    where
793        T: ProcessableCertificate,
794    {
795        let notifications = (*notifier).clone();
796        let this = self.clone();
797        linera_base::Task::spawn(async move {
798            let (response, actions) =
799                ProcessableCertificate::process_certificate(&this, certificate).await?;
800            notifications.notify(&actions.notifications);
801            let mut requests = VecDeque::from(actions.cross_chain_requests);
802            while let Some(request) = requests.pop_front() {
803                let actions = this.handle_cross_chain_request(request).await?;
804                requests.extend(actions.cross_chain_requests);
805                notifications.notify(&actions.notifications);
806            }
807            Ok(response)
808        })
809        .await
810    }
811
812    /// Acquires a read lock on the chain worker and executes the given closure.
813    ///
814    /// The future is boxed to keep deeply nested types off the stack. On non-web
815    /// targets it is also wrapped in `SyncFuture` to satisfy `Sync` bounds.
816    async fn chain_read<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
817    where
818        F: FnOnce(OwnedRwLockReadGuard<ChainWorkerState<StorageClient>>) -> Fut,
819        Fut: std::future::Future<Output = Result<R, WorkerError>>,
820    {
821        let state = self.get_or_create_chain_worker(chain_id).await?;
822        let state_ref = &state;
823        let result = Box::pin(wrap_future(async move {
824            let guard = handle::read_lock(state_ref).await;
825            guard.check_not_poisoned()?;
826            f(guard).await
827        }))
828        .await;
829        if let Err(error) = &result {
830            if error.must_reload_view() {
831                self.evict_poisoned_worker(chain_id, &state);
832            }
833        }
834        result
835    }
836
837    /// Acquires a write lock on the chain worker and executes the given closure.
838    ///
839    /// The write work runs on a detached task (via [`linera_base::task::run_detached`])
840    /// so that caller cancellation does not unwind the task mid-save. The
841    /// [`RollbackGuard`] lives inside the detached task, so the write lock is held
842    /// until the DB round-trip and `post_save` have fully completed — subsequent
843    /// readers, including a freshly-loaded replacement worker, only see the
844    /// committed state.
845    async fn chain_write<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
846    where
847        F: FnOnce(handle::RollbackGuard<StorageClient>) -> Fut
848            + linera_base::task::MaybeSend
849            + 'static,
850        Fut: std::future::Future<Output = Result<R, WorkerError>> + linera_base::task::MaybeSend,
851        R: linera_base::task::MaybeSend + 'static,
852    {
853        let state = self.get_or_create_chain_worker(chain_id).await?;
854        let state_for_task = state.clone();
855        let result = Box::pin(wrap_future(linera_base::task::run_detached(async move {
856            let guard = handle::write_lock(&state_for_task).await;
857            guard.check_not_poisoned()?;
858            f(guard).await
859        })))
860        .await;
861        if let Err(error) = &result {
862            if error.must_reload_view() {
863                self.evict_poisoned_worker(chain_id, &state);
864            } else if error.indicates_corrupted_chain_state() {
865                self.spawn_reset_corrupted_chain_state(chain_id, state);
866            }
867        }
868        result
869    }
870
871    /// Spawns a detached task that re-acquires the write lock and recovers the
872    /// chain from a detected state corruption. Running the recovery in a separate
873    /// task ensures it survives cancellation of the originating request: if the
874    /// caller's future is dropped mid-way through re-execution, the chain would
875    /// otherwise be left at a partial tip and our safety snapshot would be lost.
876    /// Generated `RevertConfirm` requests are dispatched via the installed
877    /// shard-routing sender when present (sharded validators), or locally
878    /// through `handle_cross_chain_request` otherwise (client nodes and tests).
879    /// Errors are logged; the caller already has the original error to
880    /// propagate.
881    fn spawn_reset_corrupted_chain_state(
882        &self,
883        chain_id: ChainId,
884        state: ChainWorkerArc<StorageClient>,
885    ) where
886        StorageClient: Clone,
887    {
888        let this = self.clone();
889        linera_base::Task::spawn(async move {
890            let requests = {
891                let mut guard = handle::write_lock(&state).await;
892                match guard.maybe_reset_corrupted_chain_state().await {
893                    Ok(Some(requests)) => requests,
894                    Ok(None) => return,
895                    Err(error) => {
896                        tracing::error!(
897                            %chain_id, %error, "Failed to reset corrupted chain state"
898                        );
899                        return;
900                    }
901                }
902            };
903            if let Some(sender) = &this.outbound_cross_chain_sender {
904                // Sharded validator path: let the RPC layer route each request to
905                // the shard that owns the target chain.
906                for request in requests {
907                    sender(request);
908                }
909            } else {
910                // No routing dispatcher is installed (client node or test), so all
911                // involved chains are co-located on this worker. Dispatch locally
912                // in a loop, following any cascading cross-chain requests.
913                let mut queue = VecDeque::from(requests);
914                while let Some(request) = queue.pop_front() {
915                    match this.handle_cross_chain_request(request).await {
916                        Ok(actions) => queue.extend(actions.cross_chain_requests),
917                        Err(error) => {
918                            warn!(
919                                %chain_id, %error,
920                                "Failed to dispatch cross-chain request after \
921                                resetting corrupted chain state"
922                            );
923                        }
924                    }
925                }
926            }
927        })
928        .forget();
929    }
930
931    /// Evicts a poisoned chain worker from the cache, but only if the entry still
932    /// points to the same instance. This avoids removing a fresh replacement that
933    /// another task may have already loaded.
934    fn evict_poisoned_worker(&self, chain_id: ChainId, poisoned: &ChainWorkerArc<StorageClient>) {
935        tracing::warn!(%chain_id, "Evicting poisoned chain worker from cache");
936        let pin = self.chain_workers.pin();
937        let weak_poisoned = Arc::downgrade(poisoned);
938        let removed = pin.remove_if(&chain_id, |_key, future| {
939            future
940                .peek()
941                .and_then(|r| r.clone().ok())
942                .is_some_and(|weak| weak.ptr_eq(&weak_poisoned))
943        });
944        if removed.is_err() {
945            tracing::trace!(%chain_id, "Poisoned worker entry already replaced; skipping eviction");
946        }
947    }
948
949    /// Gets or creates a chain worker for the given chain.
950    ///
951    /// The oneshot channel is created outside the `compute` closure to keep
952    /// the closure pure (papaya may call it more than once on CAS retry and
953    /// may memoize the output). If the fast path hits, the unused channel is
954    /// dropped harmlessly.
955    ///
956    /// Returns a type-erased future to keep `!Sync` intermediate types (e.g.
957    /// `std::sync::mpsc::Receiver` from `handle::ServiceRuntimeActor::spawn`) out of
958    /// the caller's future type.
959    fn get_or_create_chain_worker(
960        &self,
961        chain_id: ChainId,
962    ) -> std::pin::Pin<
963        Box<
964            impl std::future::Future<Output = Result<ChainWorkerArc<StorageClient>, WorkerError>> + '_,
965        >,
966    > {
967        Box::pin(wrap_future(async move {
968            loop {
969                // Create the channel outside the closure so that the tx/rx
970                // always match regardless of CAS retries.
971                let (tx, rx) = oneshot::channel();
972                let shared_rx = rx.shared();
973
974                // The papaya guard is !Send, so it must be dropped before
975                // any .await point.
976                let wait_or_tx = {
977                    let pin = self.chain_workers.pin();
978                    match pin.compute(chain_id, |existing| match existing {
979                        Some((_, entry)) => match entry.peek() {
980                            Some(Ok(weak)) => match weak.upgrade() {
981                                Some(arc) => papaya::Operation::Abort(Ok(arc)),
982                                None => papaya::Operation::Insert(shared_rx.clone()),
983                            },
984                            Some(Err(_)) => papaya::Operation::Insert(shared_rx.clone()),
985                            None => papaya::Operation::Abort(Err(entry.clone())),
986                        },
987                        None => papaya::Operation::Insert(shared_rx.clone()),
988                    }) {
989                        papaya::Compute::Aborted(Ok(arc), ..) => return Ok(arc),
990                        papaya::Compute::Aborted(Err(wait), ..) => Either::Left(wait),
991                        papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => {
992                            Either::Right(tx)
993                        }
994                        papaya::Compute::Removed { .. } => unreachable!(),
995                    }
996                };
997
998                match wait_or_tx {
999                    Either::Left(wait) => {
1000                        // Another task is loading. Await the shared future.
1001                        if let Ok(weak) = wait.await {
1002                            if let Some(arc) = weak.upgrade() {
1003                                return Ok(arc);
1004                            }
1005                        }
1006                        // Loading failed or worker already dead; retry.
1007                    }
1008                    Either::Right(tx) => {
1009                        // We claimed the loading slot. Load from storage.
1010                        // On success, send the Weak through the channel.
1011                        // On error, dropping tx wakes waiters so they can retry.
1012                        let worker = self.load_chain_worker(chain_id).await?;
1013                        if tx.send(Arc::downgrade(&worker)).is_err() {
1014                            tracing::error!(%chain_id, "Receiver dropped while loading worker state.");
1015                            continue;
1016                        }
1017                        return Ok(worker);
1018                    }
1019                }
1020            }
1021        }))
1022    }
1023
1024    /// Loads a chain worker state from storage and wraps it in an Arc.
1025    async fn load_chain_worker(
1026        &self,
1027        chain_id: ChainId,
1028    ) -> Result<ChainWorkerArc<StorageClient>, WorkerError> {
1029        let delivery_notifier = self
1030            .delivery_notifiers
1031            .lock()
1032            .unwrap()
1033            .entry(chain_id)
1034            .or_default()
1035            .clone();
1036
1037        // `chain_modes=None` means "no tracked/sender distinction" (validators) — treat
1038        // every chain as tracked so it routes through `config.ttl`, not the unset
1039        // `config.sender_chain_ttl` which would skip `spawn_keep_alive` entirely.
1040        let is_tracked = self.chain_modes.as_ref().is_none_or(|chain_modes| {
1041            chain_modes
1042                .read()
1043                .unwrap()
1044                .get(&chain_id)
1045                .is_some_and(ListeningMode::is_full)
1046        });
1047
1048        let (service_runtime_endpoint, service_runtime_task) =
1049            if self.chain_worker_config.long_lived_services {
1050                let actor =
1051                    handle::ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await;
1052                (Some(actor.endpoint), Some(actor.task))
1053            } else {
1054                (None, None)
1055            };
1056
1057        let state = crate::chain_worker::state::ChainWorkerState::load(
1058            self.chain_worker_config.clone(),
1059            self.storage.clone(),
1060            self.block_cache.clone(),
1061            self.execution_state_cache.clone(),
1062            self.chain_modes.clone(),
1063            delivery_notifier,
1064            chain_id,
1065            service_runtime_endpoint,
1066            service_runtime_task,
1067        )
1068        .await?;
1069
1070        Ok(handle::create_chain_worker(
1071            state,
1072            is_tracked,
1073            &self.chain_worker_config,
1074        ))
1075    }
1076
1077    /// Tries to execute a block proposal with a policy for handling bundle failures.
1078    ///
1079    /// Returns the modified block (bundles may be rejected/removed), the executed block,
1080    /// chain info response, and resource tracker.
1081    #[instrument(level = "trace", skip(self, block))]
1082    pub async fn stage_block_execution(
1083        &self,
1084        block: ProposedBlock,
1085        round: Option<u32>,
1086        published_blobs: Vec<Blob>,
1087        policy: BundleExecutionPolicy,
1088    ) -> Result<
1089        (
1090            ProposedBlock,
1091            Block,
1092            ChainInfoResponse,
1093            ResourceTracker,
1094            HashSet<ChainId>,
1095        ),
1096        WorkerError,
1097    > {
1098        let chain_id = block.chain_id;
1099        self.chain_write(chain_id, move |mut guard| async move {
1100            guard
1101                .stage_block_execution(block, round, &published_blobs, policy)
1102                .await
1103        })
1104        .await
1105    }
1106
1107    /// Executes a [`Query`] for an application's state on a specific chain.
1108    ///
1109    /// If `block_hash` is specified, system will query the application's state
1110    /// at that block. If it doesn't exist, it uses latest state.
1111    #[instrument(level = "trace", skip(self, chain_id, query))]
1112    pub async fn query_application(
1113        &self,
1114        chain_id: ChainId,
1115        query: Query,
1116        block_hash: Option<CryptoHash>,
1117    ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
1118        self.chain_write(chain_id, move |mut guard| async move {
1119            guard.query_application(query, block_hash).await
1120        })
1121        .await
1122    }
1123
1124    #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
1125        nickname = %self.nickname(),
1126        chain_id = %chain_id,
1127        application_id = %application_id
1128    ))]
1129    pub async fn describe_application(
1130        &self,
1131        chain_id: ChainId,
1132        application_id: ApplicationId,
1133    ) -> Result<ApplicationDescription, WorkerError> {
1134        let state = self.get_or_create_chain_worker(chain_id).await?;
1135        let guard = handle::read_lock_initialized(&state).await?;
1136        guard.describe_application_readonly(application_id).await
1137    }
1138
1139    /// Processes a confirmed block (aka a commit).
1140    #[instrument(
1141        level = "trace",
1142        skip(self, certificate, notify_when_messages_are_delivered),
1143        fields(
1144            nickname = %self.nickname(),
1145            chain_id = %certificate.block().header.chain_id,
1146            block_height = %certificate.block().header.height
1147        )
1148    )]
1149    async fn process_confirmed_block(
1150        &self,
1151        certificate: ConfirmedBlockCertificate,
1152        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1153    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1154        let chain_id = certificate.block().header.chain_id;
1155        self.chain_write(chain_id, move |mut guard| async move {
1156            guard
1157                .process_confirmed_block(certificate, notify_when_messages_are_delivered)
1158                .await
1159        })
1160        .await
1161    }
1162
1163    /// Processes a validated block issued from a multi-owner chain.
1164    #[instrument(level = "trace", skip(self, certificate), fields(
1165        nickname = %self.nickname(),
1166        chain_id = %certificate.block().header.chain_id,
1167        block_height = %certificate.block().header.height
1168    ))]
1169    async fn process_validated_block(
1170        &self,
1171        certificate: ValidatedBlockCertificate,
1172    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1173        let chain_id = certificate.block().header.chain_id;
1174        self.chain_write(chain_id, move |mut guard| async move {
1175            guard.process_validated_block(certificate).await
1176        })
1177        .await
1178    }
1179
1180    /// Processes a leader timeout issued from a multi-owner chain.
1181    #[instrument(level = "trace", skip(self, certificate), fields(
1182        nickname = %self.nickname(),
1183        chain_id = %certificate.value().chain_id(),
1184        height = %certificate.value().height()
1185    ))]
1186    async fn process_timeout(
1187        &self,
1188        certificate: TimeoutCertificate,
1189    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1190        let chain_id = certificate.value().chain_id();
1191        self.chain_write(chain_id, move |mut guard| async move {
1192            guard.process_timeout(certificate).await
1193        })
1194        .await
1195    }
1196
1197    #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
1198        nickname = %self.nickname(),
1199        origin = %origin,
1200        recipient = %recipient,
1201        num_bundles = %bundles.len()
1202    ))]
1203    async fn process_cross_chain_update(
1204        &self,
1205        origin: ChainId,
1206        recipient: ChainId,
1207        bundles: Vec<(Epoch, MessageBundle)>,
1208        previous_height: Option<BlockHeight>,
1209    ) -> Result<CrossChainUpdateResult, WorkerError> {
1210        self.chain_write(recipient, move |mut guard| async move {
1211            guard
1212                .process_cross_chain_update(origin, bundles, previous_height)
1213                .await
1214        })
1215        .await
1216    }
1217
1218    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
1219    #[instrument(level = "trace", skip(self, chain_id, height), fields(
1220        nickname = %self.nickname(),
1221        chain_id = %chain_id,
1222        height = %height
1223    ))]
1224    #[cfg(with_testing)]
1225    pub async fn read_certificate(
1226        &self,
1227        chain_id: ChainId,
1228        height: BlockHeight,
1229    ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, WorkerError> {
1230        let state = self.get_or_create_chain_worker(chain_id).await?;
1231        let guard = handle::read_lock_initialized(&state).await?;
1232        guard.read_certificate(height).await
1233    }
1234
1235    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
1236    /// [`ChainId`].
1237    ///
1238    /// The returned guard holds a read lock on the chain state, preventing writes for
1239    /// its lifetime. Multiple concurrent readers are allowed.
1240    #[instrument(level = "trace", skip(self), fields(
1241        nickname = %self.nickname(),
1242        chain_id = %chain_id
1243    ))]
1244    pub async fn chain_state_view(
1245        &self,
1246        chain_id: ChainId,
1247    ) -> Result<ChainStateViewReadGuard<StorageClient>, WorkerError> {
1248        let state = self.get_or_create_chain_worker(chain_id).await?;
1249        let guard = handle::read_lock(&state).await;
1250        Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map(
1251            guard,
1252            |s| s.chain(),
1253        )))
1254    }
1255
1256    #[instrument(skip_all, fields(
1257        nick = self.nickname(),
1258        chain_id = format!("{:.8}", proposal.content.block.chain_id),
1259        height = %proposal.content.block.height,
1260    ))]
1261    pub async fn handle_block_proposal(
1262        &self,
1263        proposal: BlockProposal,
1264    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1265        trace!("{} <-- {:?}", self.nickname(), proposal);
1266        #[cfg(with_metrics)]
1267        let round = proposal.content.round;
1268
1269        let chain_id = proposal.content.block.chain_id;
1270        // Delay if block timestamp is in the future but within grace period.
1271        let now = self.storage.clock().current_time();
1272        let block_timestamp = proposal.content.block.timestamp;
1273        let delta = block_timestamp.delta_since(now);
1274        let grace_period = TimeDelta::from_micros(
1275            u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros())
1276                .unwrap_or(u64::MAX),
1277        );
1278        if delta > TimeDelta::ZERO && delta <= grace_period {
1279            self.storage.clock().sleep_until(block_timestamp).await;
1280        }
1281
1282        let response = self
1283            .chain_write(chain_id, move |mut guard| async move {
1284                guard.handle_block_proposal(proposal).await
1285            })
1286            .await?;
1287        #[cfg(with_metrics)]
1288        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1289            .with_label_values(&[round.type_name()])
1290            .observe(round.number() as f64);
1291        Ok(response)
1292    }
1293
1294    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
1295    // Other fields will be included in the caller's span.
1296    #[instrument(skip_all, fields(
1297        chain_id = %certificate.value.chain_id,
1298        hash = %certificate.value.value_hash,
1299    ))]
1300    pub async fn handle_lite_certificate(
1301        &self,
1302        certificate: LiteCertificate<'_>,
1303        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1304    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1305        match self.full_certificate(certificate).await? {
1306            Either::Left(confirmed) => {
1307                Box::pin(
1308                    self.handle_confirmed_certificate(
1309                        confirmed,
1310                        notify_when_messages_are_delivered,
1311                    ),
1312                )
1313                .await
1314            }
1315            Either::Right(validated) => {
1316                if let Some(notifier) = notify_when_messages_are_delivered {
1317                    // Nothing to wait for.
1318                    if let Err(()) = notifier.send(()) {
1319                        warn!("Failed to notify message delivery to caller");
1320                    }
1321                }
1322                Box::pin(self.handle_validated_certificate(validated)).await
1323            }
1324        }
1325    }
1326
1327    /// Processes a confirmed block certificate.
1328    #[instrument(skip_all, fields(
1329        nick = self.nickname(),
1330        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1331        height = %certificate.block().header.height,
1332    ))]
1333    pub async fn handle_confirmed_certificate(
1334        &self,
1335        certificate: ConfirmedBlockCertificate,
1336        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1337    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1338        trace!("{} <-- {:?}", self.nickname(), certificate);
1339        #[cfg(with_metrics)]
1340        let metrics_data = metrics::MetricsData::new(&certificate);
1341
1342        #[allow(unused_variables)]
1343        let (info, actions, outcome) =
1344            Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1345                .await?;
1346
1347        #[cfg(with_metrics)]
1348        if matches!(outcome, BlockOutcome::Processed) {
1349            metrics_data.record();
1350        }
1351        Ok((info, actions))
1352    }
1353
1354    /// Processes a validated block certificate.
1355    #[instrument(skip_all, fields(
1356        nick = self.nickname(),
1357        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1358        height = %certificate.block().header.height,
1359    ))]
1360    pub async fn handle_validated_certificate(
1361        &self,
1362        certificate: ValidatedBlockCertificate,
1363    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1364        trace!("{} <-- {:?}", self.nickname(), certificate);
1365
1366        #[cfg(with_metrics)]
1367        let round = certificate.round;
1368        #[cfg(with_metrics)]
1369        let cert_str = certificate.inner().to_log_str();
1370
1371        #[allow(unused_variables)]
1372        let (info, actions, outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1373        #[cfg(with_metrics)]
1374        {
1375            if matches!(outcome, BlockOutcome::Processed) {
1376                metrics::NUM_ROUNDS_IN_CERTIFICATE
1377                    .with_label_values(&[cert_str, round.type_name()])
1378                    .observe(round.number() as f64);
1379            }
1380        }
1381        Ok((info, actions))
1382    }
1383
1384    /// Processes a timeout certificate
1385    #[instrument(skip_all, fields(
1386        nick = self.nickname(),
1387        chain_id = format!("{:.8}", certificate.inner().chain_id()),
1388        height = %certificate.inner().height(),
1389    ))]
1390    pub async fn handle_timeout_certificate(
1391        &self,
1392        certificate: TimeoutCertificate,
1393    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1394        trace!("{} <-- {:?}", self.nickname(), certificate);
1395        self.process_timeout(certificate).await
1396    }
1397
1398    #[instrument(skip_all, fields(
1399        nick = self.nickname(),
1400        chain_id = format!("{:.8}", query.chain_id)
1401    ))]
1402    pub async fn handle_chain_info_query(
1403        &self,
1404        query: ChainInfoQuery,
1405    ) -> Result<ChainInfoResponse, WorkerError> {
1406        trace!("{} <-- {:?}", self.nickname(), query);
1407        #[cfg(with_metrics)]
1408        metrics::CHAIN_INFO_QUERIES.inc();
1409        let chain_id = query.chain_id;
1410        let result = self
1411            .chain_write(chain_id, move |mut guard| async move {
1412                guard.handle_chain_info_query(query).await
1413            })
1414            .await;
1415        trace!("{} --> {:?}", self.nickname(), result);
1416        result
1417    }
1418
1419    #[instrument(skip_all, fields(
1420        nick = self.nickname(),
1421        chain_id = format!("{:.8}", chain_id)
1422    ))]
1423    pub async fn download_pending_blob(
1424        &self,
1425        chain_id: ChainId,
1426        blob_id: BlobId,
1427    ) -> Result<Arc<Blob>, WorkerError> {
1428        trace!("{} <-- download_pending_blob({blob_id:8})", self.nickname());
1429        let result = self
1430            .chain_read(chain_id, |guard| async move {
1431                guard.download_pending_blob(blob_id).await
1432            })
1433            .await;
1434        trace!(
1435            "{} --> {:?}",
1436            self.nickname(),
1437            result.as_ref().map(|_| blob_id)
1438        );
1439        result
1440    }
1441
1442    #[instrument(skip_all, fields(
1443        nick = self.nickname(),
1444        chain_id = format!("{:.8}", chain_id)
1445    ))]
1446    pub async fn handle_pending_blob(
1447        &self,
1448        chain_id: ChainId,
1449        blob: Blob,
1450    ) -> Result<ChainInfoResponse, WorkerError> {
1451        let blob_id = blob.id();
1452        trace!("{} <-- handle_pending_blob({blob_id:8})", self.nickname());
1453        let result = self
1454            .chain_write(chain_id, move |mut guard| async move {
1455                guard.handle_pending_blob(blob).await
1456            })
1457            .await;
1458        trace!(
1459            "{} --> {:?}",
1460            self.nickname(),
1461            result.as_ref().map(|_| blob_id)
1462        );
1463        result
1464    }
1465
1466    #[instrument(skip_all, fields(
1467        nick = self.nickname(),
1468        chain_id = format!("{:.8}", request.target_chain_id())
1469    ))]
1470    pub async fn handle_cross_chain_request(
1471        &self,
1472        request: CrossChainRequest,
1473    ) -> Result<NetworkActions, WorkerError> {
1474        trace!("{} <-- {:?}", self.nickname(), request);
1475        match request {
1476            CrossChainRequest::UpdateRecipient {
1477                sender,
1478                recipient,
1479                bundles,
1480                previous_height,
1481            } => {
1482                let mut actions = NetworkActions::default();
1483                let origin = sender;
1484                match self
1485                    .process_cross_chain_update(origin, recipient, bundles, previous_height)
1486                    .await?
1487                {
1488                    CrossChainUpdateResult::NothingToDo => {}
1489                    CrossChainUpdateResult::Updated(height) => {
1490                        actions.notifications.push(Notification {
1491                            chain_id: recipient,
1492                            reason: Reason::NewIncomingBundle { origin, height },
1493                        });
1494                        actions.cross_chain_requests.push(
1495                            CrossChainRequest::ConfirmUpdatedRecipient {
1496                                sender,
1497                                recipient,
1498                                latest_height: height,
1499                            },
1500                        );
1501                    }
1502                    CrossChainUpdateResult::GapDetected {
1503                        origin,
1504                        retransmit_from,
1505                    } => {
1506                        actions
1507                            .cross_chain_requests
1508                            .push(CrossChainRequest::RevertConfirm {
1509                                sender: origin,
1510                                recipient,
1511                                retransmit_from,
1512                            });
1513                    }
1514                }
1515                Ok(actions)
1516            }
1517            CrossChainRequest::ConfirmUpdatedRecipient {
1518                sender,
1519                recipient,
1520                latest_height,
1521            } => {
1522                let actions = self
1523                    .chain_write(sender, move |mut guard| async move {
1524                        guard
1525                            .confirm_updated_recipient(recipient, latest_height)
1526                            .await
1527                    })
1528                    .await?;
1529                Ok(actions)
1530            }
1531            CrossChainRequest::RevertConfirm {
1532                sender,
1533                recipient,
1534                retransmit_from,
1535            } => {
1536                self.chain_write(sender, move |mut guard| async move {
1537                    guard
1538                        .handle_revert_confirm(recipient, retransmit_from)
1539                        .await
1540                })
1541                .await
1542            }
1543        }
1544    }
1545
1546    /// Updates the received certificate trackers to at least the given values.
1547    #[instrument(skip_all, fields(
1548        nickname = %self.nickname(),
1549        chain_id = %chain_id,
1550        num_trackers = %new_trackers.len()
1551    ))]
1552    pub async fn update_received_certificate_trackers(
1553        &self,
1554        chain_id: ChainId,
1555        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1556    ) -> Result<(), WorkerError> {
1557        self.chain_write(chain_id, move |mut guard| async move {
1558            guard
1559                .update_received_certificate_trackers(new_trackers)
1560                .await
1561        })
1562        .await
1563    }
1564
1565    /// Gets preprocessed block hashes in a given height range.
1566    #[instrument(skip_all, fields(
1567        nickname = %self.nickname(),
1568        chain_id = %chain_id,
1569        start = %start,
1570        end = %end
1571    ))]
1572    pub async fn get_preprocessed_block_hashes(
1573        &self,
1574        chain_id: ChainId,
1575        start: BlockHeight,
1576        end: BlockHeight,
1577    ) -> Result<Vec<CryptoHash>, WorkerError> {
1578        self.chain_read(chain_id, |guard| async move {
1579            guard.get_preprocessed_block_hashes(start, end).await
1580        })
1581        .await
1582    }
1583
1584    /// Gets the next block height to receive from an inbox.
1585    #[instrument(skip_all, fields(
1586        nickname = %self.nickname(),
1587        chain_id = %chain_id,
1588        origin = %origin
1589    ))]
1590    pub async fn get_inbox_next_height(
1591        &self,
1592        chain_id: ChainId,
1593        origin: ChainId,
1594    ) -> Result<BlockHeight, WorkerError> {
1595        self.chain_read(chain_id, |guard| async move {
1596            guard.get_inbox_next_height(origin).await
1597        })
1598        .await
1599    }
1600
1601    /// Gets locking blobs for specific blob IDs.
1602    /// Returns `Ok(None)` if any of the blobs is not found.
1603    #[instrument(skip_all, fields(
1604        nickname = %self.nickname(),
1605        chain_id = %chain_id,
1606        num_blob_ids = %blob_ids.len()
1607    ))]
1608    pub async fn get_locking_blobs(
1609        &self,
1610        chain_id: ChainId,
1611        blob_ids: Vec<BlobId>,
1612    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1613        self.chain_read(chain_id, |guard| async move {
1614            guard.get_locking_blobs(blob_ids).await
1615        })
1616        .await
1617    }
1618
1619    /// Gets block hashes for the given heights.
1620    pub async fn get_block_hashes(
1621        &self,
1622        chain_id: ChainId,
1623        heights: Vec<BlockHeight>,
1624    ) -> Result<Vec<CryptoHash>, WorkerError> {
1625        self.chain_read(chain_id, |guard| async move {
1626            guard.get_block_hashes(heights).await
1627        })
1628        .await
1629    }
1630
1631    /// Gets proposed blobs from the manager for specified blob IDs.
1632    pub async fn get_proposed_blobs(
1633        &self,
1634        chain_id: ChainId,
1635        blob_ids: Vec<BlobId>,
1636    ) -> Result<Vec<Blob>, WorkerError> {
1637        self.chain_read(chain_id, |guard| async move {
1638            guard.get_proposed_blobs(blob_ids).await
1639        })
1640        .await
1641    }
1642
1643    /// Gets event subscriptions from the chain.
1644    pub async fn get_event_subscriptions(
1645        &self,
1646        chain_id: ChainId,
1647    ) -> Result<EventSubscriptionsResult, WorkerError> {
1648        self.chain_read(chain_id, |guard| async move {
1649            guard.get_event_subscriptions().await
1650        })
1651        .await
1652    }
1653
1654    /// Gets the next expected event index for a stream.
1655    pub async fn get_next_expected_event(
1656        &self,
1657        chain_id: ChainId,
1658        stream_id: StreamId,
1659    ) -> Result<Option<u32>, WorkerError> {
1660        self.chain_read(chain_id, |guard| async move {
1661            guard.get_next_expected_event(stream_id).await
1662        })
1663        .await
1664    }
1665
1666    /// Gets the `next_expected_events` indices for the given streams.
1667    pub async fn next_expected_events(
1668        &self,
1669        chain_id: ChainId,
1670        stream_ids: Vec<StreamId>,
1671    ) -> Result<BTreeMap<StreamId, u32>, WorkerError> {
1672        self.chain_read(chain_id, |guard| async move {
1673            guard.get_next_expected_events(stream_ids).await
1674        })
1675        .await
1676    }
1677
1678    /// Gets received certificate trackers.
1679    pub async fn get_received_certificate_trackers(
1680        &self,
1681        chain_id: ChainId,
1682    ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1683        self.chain_read(chain_id, |guard| async move {
1684            guard.get_received_certificate_trackers().await
1685        })
1686        .await
1687    }
1688
1689    /// Returns the pending cross-chain network actions for this chain without
1690    /// initializing its execution state. Safe to call on chains whose
1691    /// `ChainDescription` blob is not available locally.
1692    pub async fn cross_chain_network_actions(
1693        &self,
1694        chain_id: ChainId,
1695    ) -> Result<NetworkActions, WorkerError> {
1696        self.chain_read(chain_id, |guard| async move {
1697            guard.cross_chain_network_actions().await
1698        })
1699        .await
1700    }
1701
1702    /// Gets tip state and outbox info for next_outbox_heights calculation.
1703    pub async fn get_tip_state_and_outbox_info(
1704        &self,
1705        chain_id: ChainId,
1706        receiver_id: ChainId,
1707    ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1708        self.chain_read(chain_id, |guard| async move {
1709            guard.get_tip_state_and_outbox_info(receiver_id).await
1710        })
1711        .await
1712    }
1713
1714    /// Gets the next height to preprocess.
1715    pub async fn get_next_height_to_preprocess(
1716        &self,
1717        chain_id: ChainId,
1718    ) -> Result<BlockHeight, WorkerError> {
1719        self.chain_read(chain_id, |guard| async move {
1720            guard.get_next_height_to_preprocess().await
1721        })
1722        .await
1723    }
1724}
1725
1726#[cfg(with_testing)]
1727impl<StorageClient> WorkerState<StorageClient>
1728where
1729    StorageClient: Storage + Clone + 'static,
1730{
1731    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1732    ///
1733    /// # Panics
1734    ///
1735    /// If the validator doesn't have a key pair assigned to it.
1736    #[instrument(level = "trace", skip(self))]
1737    pub fn public_key(&self) -> ValidatorPublicKey {
1738        self.chain_worker_config
1739            .key_pair()
1740            .expect(
1741                "Test validator should have a key pair assigned to it \
1742                in order to obtain its public key",
1743            )
1744            .public()
1745    }
1746}