linera_core/
worker.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7    sync::{Arc, Mutex, RwLock},
8    time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14    data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round},
15    doc_scalar,
16    hashed::Hashed,
17    identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
18};
19#[cfg(with_testing)]
20use linera_chain::ChainExecutionContext;
21use linera_chain::{
22    data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
23    types::{
24        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
25        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
26    },
27    ChainError, ChainStateView,
28};
29use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
30use linera_storage::Storage;
31use linera_views::{context::InactiveContext, ViewError};
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
35use tracing::{error, instrument, trace, warn};
36
37use crate::{
38    chain_worker::{
39        BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
40    },
41    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
42    join_set_ext::{JoinSet, JoinSetExt},
43    notifier::Notifier,
44    value_cache::ValueCache,
45    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
46};
47
48#[cfg(test)]
49#[path = "unit_tests/worker_tests.rs"]
50mod worker_tests;
51
52#[cfg(with_metrics)]
53mod metrics {
54    use std::sync::LazyLock;
55
56    use linera_base::prometheus_util::{
57        exponential_bucket_interval, register_histogram_vec, register_int_counter,
58        register_int_counter_vec,
59    };
60    use prometheus::{HistogramVec, IntCounter, IntCounterVec};
61
62    pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
63        register_histogram_vec(
64            "num_rounds_in_certificate",
65            "Number of rounds in certificate",
66            &["certificate_value", "round_type"],
67            exponential_bucket_interval(0.1, 50.0),
68        )
69    });
70
71    pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
72        register_histogram_vec(
73            "num_rounds_in_block_proposal",
74            "Number of rounds in block proposal",
75            &["round_type"],
76            exponential_bucket_interval(0.1, 50.0),
77        )
78    });
79
80    pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
81        LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
82
83    pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
84        register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
85    });
86
87    pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
88        register_int_counter_vec(
89            "certificates_signed",
90            "Number of confirmed block certificates signed by each validator",
91            &["validator_name"],
92        )
93    });
94
95    pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
96        register_int_counter(
97            "chain_info_queries",
98            "Number of chain info queries processed",
99        )
100    });
101}
102
103/// Instruct the networking layer to send cross-chain requests and/or push notifications.
104#[derive(Default, Debug)]
105pub struct NetworkActions {
106    /// The cross-chain requests
107    pub cross_chain_requests: Vec<CrossChainRequest>,
108    /// The push notifications.
109    pub notifications: Vec<Notification>,
110}
111
112impl NetworkActions {
113    pub fn extend(&mut self, other: NetworkActions) {
114        self.cross_chain_requests.extend(other.cross_chain_requests);
115        self.notifications.extend(other.notifications);
116    }
117}
118
119#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
120/// Notification that a chain has a new certified block or a new message.
121pub struct Notification {
122    pub chain_id: ChainId,
123    pub reason: Reason,
124}
125
126doc_scalar!(
127    Notification,
128    "Notify that a chain has a new certified block or a new message"
129);
130
131#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
132/// Reason for the notification.
133pub enum Reason {
134    NewBlock {
135        height: BlockHeight,
136        hash: CryptoHash,
137    },
138    NewEvents {
139        height: BlockHeight,
140        hash: CryptoHash,
141        event_streams: BTreeSet<StreamId>,
142    },
143    NewIncomingBundle {
144        origin: ChainId,
145        height: BlockHeight,
146    },
147    NewRound {
148        height: BlockHeight,
149        round: Round,
150    },
151}
152
153/// Error type for worker operations.
154#[derive(Debug, Error)]
155pub enum WorkerError {
156    #[error(transparent)]
157    CryptoError(#[from] CryptoError),
158
159    #[error(transparent)]
160    ArithmeticError(#[from] ArithmeticError),
161
162    #[error(transparent)]
163    ViewError(#[from] ViewError),
164
165    #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
166    ReadCertificatesError(Vec<CryptoHash>),
167
168    #[error(transparent)]
169    ChainError(#[from] Box<ChainError>),
170
171    // Chain access control
172    #[error("Block was not signed by an authorized owner")]
173    InvalidOwner,
174
175    #[error("Operations in the block are not authenticated by the proper owner: {0}")]
176    InvalidSigner(AccountOwner),
177
178    // Chaining
179    #[error(
180        "Chain is expecting a next block at height {expected_block_height} but the given block \
181        is at height {found_block_height} instead"
182    )]
183    UnexpectedBlockHeight {
184        expected_block_height: BlockHeight,
185        found_block_height: BlockHeight,
186    },
187    #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
188    InvalidEpoch {
189        chain_id: ChainId,
190        chain_epoch: Epoch,
191        epoch: Epoch,
192    },
193
194    #[error("Events not found: {0:?}")]
195    EventsNotFound(Vec<EventId>),
196
197    // Other server-side errors
198    #[error("Invalid cross-chain request")]
199    InvalidCrossChainRequest,
200    #[error("The block does not contain the hash that we expected for the previous block")]
201    InvalidBlockChaining,
202    #[error(
203        "The given outcome is not what we computed after executing the block.\n\
204        Computed: {computed:#?}\n\
205        Submitted: {submitted:#?}"
206    )]
207    IncorrectOutcome {
208        computed: Box<BlockExecutionOutcome>,
209        submitted: Box<BlockExecutionOutcome>,
210    },
211    #[error("The block timestamp is in the future.")]
212    InvalidTimestamp,
213    #[error("We don't have the value for the certificate.")]
214    MissingCertificateValue,
215    #[error("The hash certificate doesn't match its value.")]
216    InvalidLiteCertificate,
217    #[error("Fast blocks cannot query oracles")]
218    FastBlockUsingOracles,
219    #[error("Blobs not found: {0:?}")]
220    BlobsNotFound(Vec<BlobId>),
221    #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
222    ConfirmedLogEntryNotFound {
223        height: BlockHeight,
224        chain_id: ChainId,
225    },
226    #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
227    PreprocessedBlocksEntryNotFound {
228        height: BlockHeight,
229        chain_id: ChainId,
230    },
231    #[error("The block proposal is invalid: {0}")]
232    InvalidBlockProposal(String),
233    #[error("Failed to join spawned worker task")]
234    JoinError,
235    #[error("Blob was not required by any pending block")]
236    UnexpectedBlob,
237    #[error("Number of published blobs per block must not exceed {0}")]
238    TooManyPublishedBlobs(u64),
239    #[error("Missing network description")]
240    MissingNetworkDescription,
241    #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
242    ChainActorSendError {
243        chain_id: ChainId,
244        error: Box<dyn std::error::Error + Send + Sync>,
245    },
246    #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
247    ChainActorRecvError {
248        chain_id: ChainId,
249        error: Box<dyn std::error::Error + Send + Sync>,
250    },
251}
252
253impl WorkerError {
254    /// Returns whether this error is caused by an issue in the local node.
255    ///
256    /// Returns `false` whenever the error could be caused by a bad message from a peer.
257    pub fn is_local(&self) -> bool {
258        match self {
259            WorkerError::CryptoError(_)
260            | WorkerError::ArithmeticError(_)
261            | WorkerError::InvalidOwner
262            | WorkerError::InvalidSigner(_)
263            | WorkerError::UnexpectedBlockHeight { .. }
264            | WorkerError::InvalidEpoch { .. }
265            | WorkerError::EventsNotFound(_)
266            | WorkerError::InvalidBlockChaining
267            | WorkerError::IncorrectOutcome { .. }
268            | WorkerError::InvalidTimestamp
269            | WorkerError::MissingCertificateValue
270            | WorkerError::InvalidLiteCertificate
271            | WorkerError::FastBlockUsingOracles
272            | WorkerError::BlobsNotFound(_)
273            | WorkerError::InvalidBlockProposal(_)
274            | WorkerError::UnexpectedBlob
275            | WorkerError::TooManyPublishedBlobs(_)
276            | WorkerError::ViewError(ViewError::NotFound(_)) => false,
277            WorkerError::InvalidCrossChainRequest
278            | WorkerError::ViewError(_)
279            | WorkerError::ConfirmedLogEntryNotFound { .. }
280            | WorkerError::PreprocessedBlocksEntryNotFound { .. }
281            | WorkerError::JoinError
282            | WorkerError::MissingNetworkDescription
283            | WorkerError::ChainActorSendError { .. }
284            | WorkerError::ChainActorRecvError { .. }
285            | WorkerError::ReadCertificatesError(_) => true,
286            WorkerError::ChainError(chain_error) => chain_error.is_local(),
287        }
288    }
289}
290
291impl From<ChainError> for WorkerError {
292    #[instrument(level = "trace", skip(chain_error))]
293    fn from(chain_error: ChainError) -> Self {
294        match chain_error {
295            ChainError::ExecutionError(execution_error, context) => {
296                if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
297                    Self::BlobsNotFound(blob_ids)
298                } else {
299                    Self::ChainError(Box::new(ChainError::ExecutionError(
300                        execution_error,
301                        context,
302                    )))
303                }
304            }
305            error => Self::ChainError(Box::new(error)),
306        }
307    }
308}
309
310#[cfg(with_testing)]
311impl WorkerError {
312    /// Returns the inner [`ExecutionError`] in this error.
313    ///
314    /// # Panics
315    ///
316    /// If this is not caused by an [`ExecutionError`].
317    pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
318        let WorkerError::ChainError(chain_error) = self else {
319            panic!("Expected an `ExecutionError`. Got: {self:#?}");
320        };
321
322        let ChainError::ExecutionError(execution_error, context) = *chain_error else {
323            panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
324        };
325
326        assert_eq!(context, expected_context);
327
328        *execution_error
329    }
330}
331
332/// State of a worker in a validator or a local node.
333pub struct WorkerState<StorageClient>
334where
335    StorageClient: Storage,
336{
337    /// A name used for logging
338    nickname: String,
339    /// Access to local persistent storage.
340    storage: StorageClient,
341    /// Configuration options for the [`ChainWorker`]s.
342    chain_worker_config: ChainWorkerConfig,
343    block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
344    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
345    /// Chain IDs that should be tracked by a worker.
346    tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
347    /// One-shot channels to notify callers when messages of a particular chain have been
348    /// delivered.
349    delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
350    /// The set of spawned [`ChainWorkerActor`] tasks.
351    chain_worker_tasks: Arc<Mutex<JoinSet>>,
352    /// The cache of running [`ChainWorkerActor`]s.
353    chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
354}
355
356impl<StorageClient> Clone for WorkerState<StorageClient>
357where
358    StorageClient: Storage + Clone,
359{
360    fn clone(&self) -> Self {
361        WorkerState {
362            nickname: self.nickname.clone(),
363            storage: self.storage.clone(),
364            chain_worker_config: self.chain_worker_config.clone(),
365            block_cache: self.block_cache.clone(),
366            execution_state_cache: self.execution_state_cache.clone(),
367            tracked_chains: self.tracked_chains.clone(),
368            delivery_notifiers: self.delivery_notifiers.clone(),
369            chain_worker_tasks: self.chain_worker_tasks.clone(),
370            chain_workers: self.chain_workers.clone(),
371        }
372    }
373}
374
375/// The sender endpoint for [`ChainWorkerRequest`]s.
376type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
377    ChainWorkerRequest<<StorageClient as Storage>::Context>,
378    tracing::Span,
379)>;
380
381pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
382
383impl<StorageClient> WorkerState<StorageClient>
384where
385    StorageClient: Storage,
386{
387    #[instrument(level = "trace", skip(nickname, key_pair, storage))]
388    pub fn new(
389        nickname: String,
390        key_pair: Option<ValidatorSecretKey>,
391        storage: StorageClient,
392        block_cache_size: usize,
393        execution_state_cache_size: usize,
394    ) -> Self {
395        WorkerState {
396            nickname,
397            storage,
398            chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
399            block_cache: Arc::new(ValueCache::new(block_cache_size)),
400            execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
401            tracked_chains: None,
402            delivery_notifiers: Arc::default(),
403            chain_worker_tasks: Arc::default(),
404            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
405        }
406    }
407
408    #[instrument(level = "trace", skip(nickname, storage))]
409    pub fn new_for_client(
410        nickname: String,
411        storage: StorageClient,
412        tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
413        block_cache_size: usize,
414        execution_state_cache_size: usize,
415    ) -> Self {
416        WorkerState {
417            nickname,
418            storage,
419            chain_worker_config: ChainWorkerConfig::default(),
420            block_cache: Arc::new(ValueCache::new(block_cache_size)),
421            execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
422            tracked_chains: Some(tracked_chains),
423            delivery_notifiers: Arc::default(),
424            chain_worker_tasks: Arc::default(),
425            chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
426        }
427    }
428
429    #[instrument(level = "trace", skip(self, value))]
430    pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
431        self.chain_worker_config.allow_inactive_chains = value;
432        self
433    }
434
435    #[instrument(level = "trace", skip(self, value))]
436    pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
437        self.chain_worker_config
438            .allow_messages_from_deprecated_epochs = value;
439        self
440    }
441
442    #[instrument(level = "trace", skip(self, value))]
443    pub fn with_long_lived_services(mut self, value: bool) -> Self {
444        self.chain_worker_config.long_lived_services = value;
445        self
446    }
447
448    /// Returns an instance with the specified grace period.
449    ///
450    /// Blocks with a timestamp this far in the future will still be accepted, but the validator
451    /// will wait until that timestamp before voting.
452    #[instrument(level = "trace", skip(self))]
453    pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
454        self.chain_worker_config.grace_period = grace_period;
455        self
456    }
457
458    /// Returns an instance with the specified chain worker TTL.
459    ///
460    /// Idle chain workers free their memory after that duration without requests.
461    #[instrument(level = "trace", skip(self))]
462    pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
463        self.chain_worker_config.ttl = chain_worker_ttl;
464        self
465    }
466
467    /// Returns an instance with the specified sender chain worker TTL.
468    ///
469    /// Idle sender chain workers free their memory after that duration without requests.
470    #[instrument(level = "trace", skip(self))]
471    pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
472        self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
473        self
474    }
475
476    /// Returns an instance with the specified maximum size for received_log entries.
477    ///
478    /// Sizes below `CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES` should be avoided.
479    #[instrument(level = "trace", skip(self))]
480    pub fn with_chain_info_max_received_log_entries(
481        mut self,
482        chain_info_max_received_log_entries: usize,
483    ) -> Self {
484        if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
485            warn!(
486                "The value set for the maximum size of received_log entries \
487                   may not be compatible with the latest clients: {} instead of {}",
488                chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
489            );
490        }
491        self.chain_worker_config.chain_info_max_received_log_entries =
492            chain_info_max_received_log_entries;
493        self
494    }
495
496    #[instrument(level = "trace", skip(self))]
497    pub fn nickname(&self) -> &str {
498        &self.nickname
499    }
500
501    /// Returns the storage client so that it can be manipulated or queried.
502    #[instrument(level = "trace", skip(self))]
503    #[cfg(not(feature = "test"))]
504    pub(crate) fn storage_client(&self) -> &StorageClient {
505        &self.storage
506    }
507
508    /// Returns the storage client so that it can be manipulated or queried by tests in other
509    /// crates.
510    #[instrument(level = "trace", skip(self))]
511    #[cfg(feature = "test")]
512    pub fn storage_client(&self) -> &StorageClient {
513        &self.storage
514    }
515
516    #[instrument(level = "trace", skip(self, certificate))]
517    pub(crate) async fn full_certificate(
518        &self,
519        certificate: LiteCertificate<'_>,
520    ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
521        let block = self
522            .block_cache
523            .get(&certificate.value.value_hash)
524            .ok_or(WorkerError::MissingCertificateValue)?;
525
526        match certificate.value.kind {
527            linera_chain::types::CertificateKind::Confirmed => {
528                let value = ConfirmedBlock::from_hashed(block);
529                Ok(Either::Left(
530                    certificate
531                        .with_value(value)
532                        .ok_or(WorkerError::InvalidLiteCertificate)?,
533                ))
534            }
535            linera_chain::types::CertificateKind::Validated => {
536                let value = ValidatedBlock::from_hashed(block);
537                Ok(Either::Right(
538                    certificate
539                        .with_value(value)
540                        .ok_or(WorkerError::InvalidLiteCertificate)?,
541                ))
542            }
543            _ => Err(WorkerError::InvalidLiteCertificate),
544        }
545    }
546}
547
548#[allow(async_fn_in_trait)]
549#[cfg_attr(not(web), trait_variant::make(Send))]
550pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
551    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
552        worker: &WorkerState<S>,
553        certificate: GenericCertificate<Self>,
554    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
555}
556
557impl ProcessableCertificate for ConfirmedBlock {
558    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
559        worker: &WorkerState<S>,
560        certificate: ConfirmedBlockCertificate,
561    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
562        worker.handle_confirmed_certificate(certificate, None).await
563    }
564}
565
566impl ProcessableCertificate for ValidatedBlock {
567    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
568        worker: &WorkerState<S>,
569        certificate: ValidatedBlockCertificate,
570    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
571        worker.handle_validated_certificate(certificate).await
572    }
573}
574
575impl ProcessableCertificate for Timeout {
576    async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
577        worker: &WorkerState<S>,
578        certificate: TimeoutCertificate,
579    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
580        worker.handle_timeout_certificate(certificate).await
581    }
582}
583
584impl<StorageClient> WorkerState<StorageClient>
585where
586    StorageClient: Storage + Clone + Send + Sync + 'static,
587{
588    #[instrument(level = "trace", skip(self, certificate, notifier))]
589    #[inline]
590    pub async fn fully_handle_certificate_with_notifications<T>(
591        &self,
592        certificate: GenericCertificate<T>,
593        notifier: &impl Notifier,
594    ) -> Result<ChainInfoResponse, WorkerError>
595    where
596        T: ProcessableCertificate,
597    {
598        let notifications = (*notifier).clone();
599        let this = self.clone();
600        linera_base::task::spawn(async move {
601            let (response, actions) =
602                ProcessableCertificate::process_certificate(&this, certificate).await?;
603            notifications.notify(&actions.notifications);
604            let mut requests = VecDeque::from(actions.cross_chain_requests);
605            while let Some(request) = requests.pop_front() {
606                let actions = this.handle_cross_chain_request(request).await?;
607                requests.extend(actions.cross_chain_requests);
608                notifications.notify(&actions.notifications);
609            }
610            Ok(response)
611        })
612        .await
613        .unwrap_or_else(|_| Err(WorkerError::JoinError))
614    }
615
616    /// Tries to execute a block proposal without any verification other than block execution.
617    #[instrument(level = "trace", skip(self, block))]
618    pub async fn stage_block_execution(
619        &self,
620        block: ProposedBlock,
621        round: Option<u32>,
622        published_blobs: Vec<Blob>,
623    ) -> Result<(Block, ChainInfoResponse), WorkerError> {
624        self.query_chain_worker(block.chain_id, move |callback| {
625            ChainWorkerRequest::StageBlockExecution {
626                block,
627                round,
628                published_blobs,
629                callback,
630            }
631        })
632        .await
633    }
634
635    /// Executes a [`Query`] for an application's state on a specific chain.
636    #[instrument(level = "trace", skip(self, chain_id, query))]
637    pub async fn query_application(
638        &self,
639        chain_id: ChainId,
640        query: Query,
641    ) -> Result<QueryOutcome, WorkerError> {
642        self.query_chain_worker(chain_id, move |callback| {
643            ChainWorkerRequest::QueryApplication { query, callback }
644        })
645        .await
646    }
647
648    #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
649        nickname = %self.nickname,
650        chain_id = %chain_id,
651        application_id = %application_id
652    ))]
653    pub async fn describe_application(
654        &self,
655        chain_id: ChainId,
656        application_id: ApplicationId,
657    ) -> Result<ApplicationDescription, WorkerError> {
658        self.query_chain_worker(chain_id, move |callback| {
659            ChainWorkerRequest::DescribeApplication {
660                application_id,
661                callback,
662            }
663        })
664        .await
665    }
666
667    /// Processes a confirmed block (aka a commit).
668    #[instrument(
669        level = "trace",
670        skip(self, certificate, notify_when_messages_are_delivered),
671        fields(
672            nickname = %self.nickname,
673            chain_id = %certificate.block().header.chain_id,
674            block_height = %certificate.block().header.height
675        )
676    )]
677    async fn process_confirmed_block(
678        &self,
679        certificate: ConfirmedBlockCertificate,
680        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
681    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
682        let chain_id = certificate.block().header.chain_id;
683        self.query_chain_worker(chain_id, move |callback| {
684            ChainWorkerRequest::ProcessConfirmedBlock {
685                certificate,
686                notify_when_messages_are_delivered,
687                callback,
688            }
689        })
690        .await
691    }
692
693    /// Processes a validated block issued from a multi-owner chain.
694    #[instrument(level = "trace", skip(self, certificate), fields(
695        nickname = %self.nickname,
696        chain_id = %certificate.block().header.chain_id,
697        block_height = %certificate.block().header.height
698    ))]
699    async fn process_validated_block(
700        &self,
701        certificate: ValidatedBlockCertificate,
702    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
703        let chain_id = certificate.block().header.chain_id;
704        self.query_chain_worker(chain_id, move |callback| {
705            ChainWorkerRequest::ProcessValidatedBlock {
706                certificate,
707                callback,
708            }
709        })
710        .await
711    }
712
713    /// Processes a leader timeout issued from a multi-owner chain.
714    #[instrument(level = "trace", skip(self, certificate), fields(
715        nickname = %self.nickname,
716        chain_id = %certificate.value().chain_id(),
717        height = %certificate.value().height()
718    ))]
719    async fn process_timeout(
720        &self,
721        certificate: TimeoutCertificate,
722    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
723        let chain_id = certificate.value().chain_id();
724        self.query_chain_worker(chain_id, move |callback| {
725            ChainWorkerRequest::ProcessTimeout {
726                certificate,
727                callback,
728            }
729        })
730        .await
731    }
732
733    #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
734        nickname = %self.nickname,
735        origin = %origin,
736        recipient = %recipient,
737        num_bundles = %bundles.len()
738    ))]
739    async fn process_cross_chain_update(
740        &self,
741        origin: ChainId,
742        recipient: ChainId,
743        bundles: Vec<(Epoch, MessageBundle)>,
744    ) -> Result<Option<BlockHeight>, WorkerError> {
745        self.query_chain_worker(recipient, move |callback| {
746            ChainWorkerRequest::ProcessCrossChainUpdate {
747                origin,
748                bundles,
749                callback,
750            }
751        })
752        .await
753    }
754
755    /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block.
756    #[instrument(level = "trace", skip(self, chain_id, height), fields(
757        nickname = %self.nickname,
758        chain_id = %chain_id,
759        height = %height
760    ))]
761    #[cfg(with_testing)]
762    pub async fn read_certificate(
763        &self,
764        chain_id: ChainId,
765        height: BlockHeight,
766    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
767        self.query_chain_worker(chain_id, move |callback| {
768            ChainWorkerRequest::ReadCertificate { height, callback }
769        })
770        .await
771    }
772
773    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
774    /// [`ChainId`].
775    ///
776    /// The returned view holds a lock on the chain state, which prevents the worker from changing
777    /// the state of that chain.
778    #[instrument(level = "trace", skip(self), fields(
779        nickname = %self.nickname,
780        chain_id = %chain_id
781    ))]
782    pub async fn chain_state_view(
783        &self,
784        chain_id: ChainId,
785    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
786        self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
787            callback,
788        })
789        .await
790    }
791
792    /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
793    #[instrument(level = "trace", skip(self, request_builder), fields(
794        nickname = %self.nickname,
795        chain_id = %chain_id
796    ))]
797    async fn query_chain_worker<Response>(
798        &self,
799        chain_id: ChainId,
800        request_builder: impl FnOnce(
801            oneshot::Sender<Result<Response, WorkerError>>,
802        ) -> ChainWorkerRequest<StorageClient::Context>,
803    ) -> Result<Response, WorkerError> {
804        // Build the request.
805        let (callback, response) = oneshot::channel();
806        let request = request_builder(callback);
807
808        // Call the endpoint, possibly a new one.
809        let new_receiver = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
810
811        // We just created an endpoint: spawn the actor.
812        if let Some(receiver) = new_receiver {
813            let delivery_notifier = self
814                .delivery_notifiers
815                .lock()
816                .unwrap()
817                .entry(chain_id)
818                .or_default()
819                .clone();
820
821            let is_tracked = self
822                .tracked_chains
823                .as_ref()
824                .is_some_and(|tracked_chains| tracked_chains.read().unwrap().contains(&chain_id));
825
826            let actor_task = ChainWorkerActor::run(
827                self.chain_worker_config.clone(),
828                self.storage.clone(),
829                self.block_cache.clone(),
830                self.execution_state_cache.clone(),
831                self.tracked_chains.clone(),
832                delivery_notifier,
833                chain_id,
834                receiver,
835                is_tracked,
836            );
837
838            self.chain_worker_tasks
839                .lock()
840                .unwrap()
841                .spawn_task(actor_task);
842        }
843
844        // Finally, wait a response.
845        match response.await {
846            Err(e) => {
847                // The actor endpoint was dropped. Better luck next time!
848                Err(WorkerError::ChainActorRecvError {
849                    chain_id,
850                    error: Box::new(e),
851                })
852            }
853            Ok(response) => response,
854        }
855    }
856
857    /// Find an endpoint and call it. Create the endpoint if necessary.
858    #[instrument(level = "trace", skip(self), fields(
859        nickname = %self.nickname,
860        chain_id = %chain_id
861    ))]
862    #[expect(clippy::type_complexity)]
863    fn call_and_maybe_create_chain_worker_endpoint(
864        &self,
865        chain_id: ChainId,
866        request: ChainWorkerRequest<StorageClient::Context>,
867    ) -> Result<
868        Option<
869            mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
870        >,
871        WorkerError,
872    > {
873        let mut chain_workers = self.chain_workers.lock().unwrap();
874
875        let (sender, new_receiver) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
876            (endpoint, None)
877        } else {
878            let (sender, receiver) = mpsc::unbounded_channel();
879            (sender, Some(receiver))
880        };
881
882        if let Err(e) = sender.send((request, tracing::Span::current())) {
883            // The actor was dropped. Give up without (re-)inserting the endpoint in the cache.
884            return Err(WorkerError::ChainActorSendError {
885                chain_id,
886                error: Box::new(e),
887            });
888        }
889
890        // Put back the sender in the cache for next time.
891        chain_workers.insert(chain_id, sender);
892
893        Ok(new_receiver)
894    }
895
896    #[instrument(skip_all, fields(
897        nick = self.nickname,
898        chain_id = format!("{:.8}", proposal.content.block.chain_id),
899        height = %proposal.content.block.height,
900    ))]
901    pub async fn handle_block_proposal(
902        &self,
903        proposal: BlockProposal,
904    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
905        trace!("{} <-- {:?}", self.nickname, proposal);
906        #[cfg(with_metrics)]
907        let round = proposal.content.round;
908        let response = self
909            .query_chain_worker(proposal.content.block.chain_id, move |callback| {
910                ChainWorkerRequest::HandleBlockProposal { proposal, callback }
911            })
912            .await?;
913        #[cfg(with_metrics)]
914        metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
915            .with_label_values(&[round.type_name()])
916            .observe(round.number() as f64);
917        Ok(response)
918    }
919
920    /// Processes a certificate, e.g. to extend a chain with a confirmed block.
921    // Other fields will be included in handle_certificate's span.
922    #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
923    pub async fn handle_lite_certificate(
924        &self,
925        certificate: LiteCertificate<'_>,
926        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
927    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
928        match self.full_certificate(certificate).await? {
929            Either::Left(confirmed) => {
930                self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
931                    .await
932            }
933            Either::Right(validated) => {
934                if let Some(notifier) = notify_when_messages_are_delivered {
935                    // Nothing to wait for.
936                    if let Err(()) = notifier.send(()) {
937                        warn!("Failed to notify message delivery to caller");
938                    }
939                }
940                self.handle_validated_certificate(validated).await
941            }
942        }
943    }
944
945    /// Processes a confirmed block certificate.
946    #[instrument(skip_all, fields(
947        nick = self.nickname,
948        chain_id = format!("{:.8}", certificate.block().header.chain_id),
949        height = %certificate.block().header.height,
950    ))]
951    pub async fn handle_confirmed_certificate(
952        &self,
953        certificate: ConfirmedBlockCertificate,
954        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
955    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
956        trace!("{} <-- {:?}", self.nickname, certificate);
957        #[cfg(with_metrics)]
958        let metrics_data = (
959            certificate.inner().to_log_str(),
960            certificate.round.type_name(),
961            certificate.round.number(),
962            certificate.block().body.transactions.len() as u64,
963            certificate
964                .signatures()
965                .iter()
966                .map(|(validator_name, _)| validator_name.to_string())
967                .collect::<Vec<_>>(),
968        );
969
970        let (info, actions, _outcome) = self
971            .process_confirmed_block(certificate, notify_when_messages_are_delivered)
972            .await?;
973
974        #[cfg(with_metrics)]
975        {
976            if matches!(_outcome, BlockOutcome::Processed) {
977                let (
978                    certificate_log_str,
979                    round_type,
980                    round_number,
981                    confirmed_transactions,
982                    validators_with_signatures,
983                ) = metrics_data;
984                metrics::NUM_BLOCKS.with_label_values(&[]).inc();
985                metrics::NUM_ROUNDS_IN_CERTIFICATE
986                    .with_label_values(&[certificate_log_str, round_type])
987                    .observe(round_number as f64);
988                if confirmed_transactions > 0 {
989                    metrics::TRANSACTION_COUNT
990                        .with_label_values(&[])
991                        .inc_by(confirmed_transactions);
992                }
993
994                for validator_name in validators_with_signatures {
995                    metrics::CERTIFICATES_SIGNED
996                        .with_label_values(&[&validator_name])
997                        .inc();
998                }
999            }
1000        }
1001        Ok((info, actions))
1002    }
1003
1004    /// Processes a validated block certificate.
1005    #[instrument(skip_all, fields(
1006        nick = self.nickname,
1007        chain_id = format!("{:.8}", certificate.block().header.chain_id),
1008        height = %certificate.block().header.height,
1009    ))]
1010    pub async fn handle_validated_certificate(
1011        &self,
1012        certificate: ValidatedBlockCertificate,
1013    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1014        trace!("{} <-- {:?}", self.nickname, certificate);
1015
1016        #[cfg(with_metrics)]
1017        let round = certificate.round;
1018        #[cfg(with_metrics)]
1019        let cert_str = certificate.inner().to_log_str();
1020
1021        let (info, actions, _outcome) = self.process_validated_block(certificate).await?;
1022        #[cfg(with_metrics)]
1023        {
1024            if matches!(_outcome, BlockOutcome::Processed) {
1025                metrics::NUM_ROUNDS_IN_CERTIFICATE
1026                    .with_label_values(&[cert_str, round.type_name()])
1027                    .observe(round.number() as f64);
1028            }
1029        }
1030        Ok((info, actions))
1031    }
1032
1033    /// Processes a timeout certificate
1034    #[instrument(skip_all, fields(
1035        nick = self.nickname,
1036        chain_id = format!("{:.8}", certificate.inner().chain_id()),
1037        height = %certificate.inner().height(),
1038    ))]
1039    pub async fn handle_timeout_certificate(
1040        &self,
1041        certificate: TimeoutCertificate,
1042    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1043        trace!("{} <-- {:?}", self.nickname, certificate);
1044        self.process_timeout(certificate).await
1045    }
1046
1047    #[instrument(skip_all, fields(
1048        nick = self.nickname,
1049        chain_id = format!("{:.8}", query.chain_id)
1050    ))]
1051    pub async fn handle_chain_info_query(
1052        &self,
1053        query: ChainInfoQuery,
1054    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1055        trace!("{} <-- {:?}", self.nickname, query);
1056        #[cfg(with_metrics)]
1057        metrics::CHAIN_INFO_QUERIES.inc();
1058        let result = self
1059            .query_chain_worker(query.chain_id, move |callback| {
1060                ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1061            })
1062            .await;
1063        trace!("{} --> {:?}", self.nickname, result);
1064        result
1065    }
1066
1067    #[instrument(skip_all, fields(
1068        nick = self.nickname,
1069        chain_id = format!("{:.8}", chain_id)
1070    ))]
1071    pub async fn download_pending_blob(
1072        &self,
1073        chain_id: ChainId,
1074        blob_id: BlobId,
1075    ) -> Result<Blob, WorkerError> {
1076        trace!(
1077            "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1078            self.nickname
1079        );
1080        let result = self
1081            .query_chain_worker(chain_id, move |callback| {
1082                ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1083            })
1084            .await;
1085        trace!(
1086            "{} --> {:?}",
1087            self.nickname,
1088            result.as_ref().map(|_| blob_id)
1089        );
1090        result
1091    }
1092
1093    #[instrument(skip_all, fields(
1094        nick = self.nickname,
1095        chain_id = format!("{:.8}", chain_id)
1096    ))]
1097    pub async fn handle_pending_blob(
1098        &self,
1099        chain_id: ChainId,
1100        blob: Blob,
1101    ) -> Result<ChainInfoResponse, WorkerError> {
1102        let blob_id = blob.id();
1103        trace!(
1104            "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1105            self.nickname
1106        );
1107        let result = self
1108            .query_chain_worker(chain_id, move |callback| {
1109                ChainWorkerRequest::HandlePendingBlob { blob, callback }
1110            })
1111            .await;
1112        trace!(
1113            "{} --> {:?}",
1114            self.nickname,
1115            result.as_ref().map(|_| blob_id)
1116        );
1117        result
1118    }
1119
1120    #[instrument(skip_all, fields(
1121        nick = self.nickname,
1122        chain_id = format!("{:.8}", request.target_chain_id())
1123    ))]
1124    pub async fn handle_cross_chain_request(
1125        &self,
1126        request: CrossChainRequest,
1127    ) -> Result<NetworkActions, WorkerError> {
1128        trace!("{} <-- {:?}", self.nickname, request);
1129        match request {
1130            CrossChainRequest::UpdateRecipient {
1131                sender,
1132                recipient,
1133                bundles,
1134            } => {
1135                let mut actions = NetworkActions::default();
1136                let origin = sender;
1137                let Some(height) = self
1138                    .process_cross_chain_update(origin, recipient, bundles)
1139                    .await?
1140                else {
1141                    return Ok(actions);
1142                };
1143                actions.notifications.push(Notification {
1144                    chain_id: recipient,
1145                    reason: Reason::NewIncomingBundle { origin, height },
1146                });
1147                actions
1148                    .cross_chain_requests
1149                    .push(CrossChainRequest::ConfirmUpdatedRecipient {
1150                        sender,
1151                        recipient,
1152                        latest_height: height,
1153                    });
1154                Ok(actions)
1155            }
1156            CrossChainRequest::ConfirmUpdatedRecipient {
1157                sender,
1158                recipient,
1159                latest_height,
1160            } => {
1161                self.query_chain_worker(sender, move |callback| {
1162                    ChainWorkerRequest::ConfirmUpdatedRecipient {
1163                        recipient,
1164                        latest_height,
1165                        callback,
1166                    }
1167                })
1168                .await?;
1169                Ok(NetworkActions::default())
1170            }
1171        }
1172    }
1173
1174    /// Updates the received certificate trackers to at least the given values.
1175    #[instrument(skip_all, fields(
1176        nickname = %self.nickname,
1177        chain_id = %chain_id,
1178        num_trackers = %new_trackers.len()
1179    ))]
1180    pub async fn update_received_certificate_trackers(
1181        &self,
1182        chain_id: ChainId,
1183        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1184    ) -> Result<(), WorkerError> {
1185        self.query_chain_worker(chain_id, move |callback| {
1186            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1187                new_trackers,
1188                callback,
1189            }
1190        })
1191        .await
1192    }
1193
1194    /// Gets preprocessed block hashes in a given height range.
1195    #[instrument(skip_all, fields(
1196        nickname = %self.nickname,
1197        chain_id = %chain_id,
1198        start = %start,
1199        end = %end
1200    ))]
1201    pub async fn get_preprocessed_block_hashes(
1202        &self,
1203        chain_id: ChainId,
1204        start: BlockHeight,
1205        end: BlockHeight,
1206    ) -> Result<Vec<CryptoHash>, WorkerError> {
1207        self.query_chain_worker(chain_id, move |callback| {
1208            ChainWorkerRequest::GetPreprocessedBlockHashes {
1209                start,
1210                end,
1211                callback,
1212            }
1213        })
1214        .await
1215    }
1216
1217    /// Gets the next block height to receive from an inbox.
1218    #[instrument(skip_all, fields(
1219        nickname = %self.nickname,
1220        chain_id = %chain_id,
1221        origin = %origin
1222    ))]
1223    pub async fn get_inbox_next_height(
1224        &self,
1225        chain_id: ChainId,
1226        origin: ChainId,
1227    ) -> Result<BlockHeight, WorkerError> {
1228        self.query_chain_worker(chain_id, move |callback| {
1229            ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1230        })
1231        .await
1232    }
1233
1234    /// Gets locking blobs for specific blob IDs.
1235    /// Returns `Ok(None)` if any of the blobs is not found.
1236    #[instrument(skip_all, fields(
1237        nickname = %self.nickname,
1238        chain_id = %chain_id,
1239        num_blob_ids = %blob_ids.len()
1240    ))]
1241    pub async fn get_locking_blobs(
1242        &self,
1243        chain_id: ChainId,
1244        blob_ids: Vec<BlobId>,
1245    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1246        self.query_chain_worker(chain_id, move |callback| {
1247            ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1248        })
1249        .await
1250    }
1251
1252    /// Reads a range from the confirmed log.
1253    #[instrument(skip_all, fields(
1254        nickname = %self.nickname,
1255        chain_id = %chain_id,
1256        start = %start,
1257        end = %end
1258    ))]
1259    pub async fn read_confirmed_log(
1260        &self,
1261        chain_id: ChainId,
1262        start: BlockHeight,
1263        end: BlockHeight,
1264    ) -> Result<Vec<CryptoHash>, WorkerError> {
1265        self.query_chain_worker(chain_id, move |callback| {
1266            ChainWorkerRequest::ReadConfirmedLog {
1267                start,
1268                end,
1269                callback,
1270            }
1271        })
1272        .await
1273    }
1274}
1275
1276#[cfg(with_testing)]
1277impl<StorageClient> WorkerState<StorageClient>
1278where
1279    StorageClient: Storage,
1280{
1281    /// Gets a reference to the validator's [`ValidatorPublicKey`].
1282    ///
1283    /// # Panics
1284    ///
1285    /// If the validator doesn't have a key pair assigned to it.
1286    #[instrument(level = "trace", skip(self))]
1287    pub fn public_key(&self) -> ValidatorPublicKey {
1288        self.chain_worker_config
1289            .key_pair()
1290            .expect(
1291                "Test validator should have a key pair assigned to it \
1292                in order to obtain it's public key",
1293            )
1294            .public()
1295    }
1296}