1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14 data_types::{
15 ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp,
16 },
17 doc_scalar,
18 hashed::Hashed,
19 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
20 time::Instant,
21 util::traits::DynError,
22};
23#[cfg(with_testing)]
24use linera_chain::ChainExecutionContext;
25use linera_chain::{
26 data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
27 types::{
28 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
29 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
30 },
31 ChainError, ChainStateView,
32};
33use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
34use linera_storage::Storage;
35use linera_views::{context::InactiveContext, ViewError};
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
39use tracing::{error, instrument, trace, warn};
40
41pub(crate) use crate::chain_worker::EventSubscriptionsResult;
43use crate::{
44 chain_worker::{
45 BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
46 },
47 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
48 join_set_ext::{JoinSet, JoinSetExt},
49 notifier::Notifier,
50 value_cache::ValueCache,
51 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
52};
53
54#[cfg(test)]
55#[path = "unit_tests/worker_tests.rs"]
56mod worker_tests;
57
58#[cfg(with_metrics)]
59mod metrics {
60 use std::sync::LazyLock;
61
62 use linera_base::prometheus_util::{
63 exponential_bucket_interval, register_histogram_vec, register_int_counter,
64 register_int_counter_vec,
65 };
66 use prometheus::{HistogramVec, IntCounter, IntCounterVec};
67
68 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
69 register_histogram_vec(
70 "num_rounds_in_certificate",
71 "Number of rounds in certificate",
72 &["certificate_value", "round_type"],
73 exponential_bucket_interval(0.1, 50.0),
74 )
75 });
76
77 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
78 register_histogram_vec(
79 "num_rounds_in_block_proposal",
80 "Number of rounds in block proposal",
81 &["round_type"],
82 exponential_bucket_interval(0.1, 50.0),
83 )
84 });
85
86 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
87 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
88
89 pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
90 LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
91
92 pub static OPERATION_COUNT: LazyLock<IntCounter> =
93 LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
94
95 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
96 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
97 });
98
99 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
100 register_int_counter_vec(
101 "certificates_signed",
102 "Number of confirmed block certificates signed by each validator",
103 &["validator_name"],
104 )
105 });
106
107 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
108 register_int_counter(
109 "chain_info_queries",
110 "Number of chain info queries processed",
111 )
112 });
113}
114
115#[derive(Default, Debug)]
117pub struct NetworkActions {
118 pub cross_chain_requests: Vec<CrossChainRequest>,
120 pub notifications: Vec<Notification>,
122}
123
124impl NetworkActions {
125 pub fn extend(&mut self, other: NetworkActions) {
126 self.cross_chain_requests.extend(other.cross_chain_requests);
127 self.notifications.extend(other.notifications);
128 }
129}
130
131#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
132pub struct Notification {
134 pub chain_id: ChainId,
135 pub reason: Reason,
136}
137
138doc_scalar!(
139 Notification,
140 "Notify that a chain has a new certified block or a new message"
141);
142
143#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
144pub enum Reason {
146 NewBlock {
147 height: BlockHeight,
148 hash: CryptoHash,
149 },
150 NewEvents {
151 height: BlockHeight,
152 hash: CryptoHash,
153 event_streams: BTreeSet<StreamId>,
154 },
155 NewIncomingBundle {
156 origin: ChainId,
157 height: BlockHeight,
158 },
159 NewRound {
160 height: BlockHeight,
161 round: Round,
162 },
163 BlockExecuted {
164 height: BlockHeight,
165 hash: CryptoHash,
166 },
167}
168
169#[derive(Debug, Error)]
171pub enum WorkerError {
172 #[error(transparent)]
173 CryptoError(#[from] CryptoError),
174
175 #[error(transparent)]
176 ArithmeticError(#[from] ArithmeticError),
177
178 #[error(transparent)]
179 ViewError(#[from] ViewError),
180
181 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
182 ReadCertificatesError(Vec<CryptoHash>),
183
184 #[error(transparent)]
185 ChainError(#[from] Box<ChainError>),
186
187 #[error(transparent)]
188 BcsError(#[from] bcs::Error),
189
190 #[error("Block was not signed by an authorized owner")]
192 InvalidOwner,
193
194 #[error("Operations in the block are not authenticated by the proper owner: {0}")]
195 InvalidSigner(AccountOwner),
196
197 #[error(
199 "Chain is expecting a next block at height {expected_block_height} but the given block \
200 is at height {found_block_height} instead"
201 )]
202 UnexpectedBlockHeight {
203 expected_block_height: BlockHeight,
204 found_block_height: BlockHeight,
205 },
206 #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
207 InvalidEpoch {
208 chain_id: ChainId,
209 chain_epoch: Epoch,
210 epoch: Epoch,
211 },
212
213 #[error("Events not found: {0:?}")]
214 EventsNotFound(Vec<EventId>),
215
216 #[error("Invalid cross-chain request")]
218 InvalidCrossChainRequest,
219 #[error("The block does not contain the hash that we expected for the previous block")]
220 InvalidBlockChaining,
221 #[error(
222 "The given outcome is not what we computed after executing the block.\n\
223 Computed: {computed:#?}\n\
224 Submitted: {submitted:#?}"
225 )]
226 IncorrectOutcome {
227 computed: Box<BlockExecutionOutcome>,
228 submitted: Box<BlockExecutionOutcome>,
229 },
230 #[error(
231 "Block timestamp ({block_timestamp}) is further in the future from local time \
232 ({local_time}) than block time grace period ({block_time_grace_period:?})"
233 )]
234 InvalidTimestamp {
235 block_timestamp: Timestamp,
236 local_time: Timestamp,
237 block_time_grace_period: Duration,
238 },
239 #[error("We don't have the value for the certificate.")]
240 MissingCertificateValue,
241 #[error("The hash certificate doesn't match its value.")]
242 InvalidLiteCertificate,
243 #[error("Fast blocks cannot query oracles")]
244 FastBlockUsingOracles,
245 #[error("Blobs not found: {0:?}")]
246 BlobsNotFound(Vec<BlobId>),
247 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
248 ConfirmedLogEntryNotFound {
249 height: BlockHeight,
250 chain_id: ChainId,
251 },
252 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
253 PreprocessedBlocksEntryNotFound {
254 height: BlockHeight,
255 chain_id: ChainId,
256 },
257 #[error("The block proposal is invalid: {0}")]
258 InvalidBlockProposal(String),
259 #[error("Blob was not required by any pending block")]
260 UnexpectedBlob,
261 #[error("Number of published blobs per block must not exceed {0}")]
262 TooManyPublishedBlobs(u64),
263 #[error("Missing network description")]
264 MissingNetworkDescription,
265 #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
266 ChainActorSendError {
267 chain_id: ChainId,
268 error: Box<dyn DynError>,
269 },
270 #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
271 ChainActorRecvError {
272 chain_id: ChainId,
273 error: Box<dyn DynError>,
274 },
275
276 #[error("thread error: {0}")]
277 Thread(#[from] web_thread_pool::Error),
278}
279
280impl WorkerError {
281 pub fn is_local(&self) -> bool {
285 match self {
286 WorkerError::CryptoError(_)
287 | WorkerError::ArithmeticError(_)
288 | WorkerError::InvalidOwner
289 | WorkerError::InvalidSigner(_)
290 | WorkerError::UnexpectedBlockHeight { .. }
291 | WorkerError::InvalidEpoch { .. }
292 | WorkerError::EventsNotFound(_)
293 | WorkerError::InvalidBlockChaining
294 | WorkerError::IncorrectOutcome { .. }
295 | WorkerError::InvalidTimestamp { .. }
296 | WorkerError::MissingCertificateValue
297 | WorkerError::InvalidLiteCertificate
298 | WorkerError::FastBlockUsingOracles
299 | WorkerError::BlobsNotFound(_)
300 | WorkerError::InvalidBlockProposal(_)
301 | WorkerError::UnexpectedBlob
302 | WorkerError::TooManyPublishedBlobs(_)
303 | WorkerError::ViewError(ViewError::NotFound(_)) => false,
304 WorkerError::BcsError(_)
305 | WorkerError::InvalidCrossChainRequest
306 | WorkerError::ViewError(_)
307 | WorkerError::ConfirmedLogEntryNotFound { .. }
308 | WorkerError::PreprocessedBlocksEntryNotFound { .. }
309 | WorkerError::MissingNetworkDescription
310 | WorkerError::ChainActorSendError { .. }
311 | WorkerError::ChainActorRecvError { .. }
312 | WorkerError::Thread(_)
313 | WorkerError::ReadCertificatesError(_) => true,
314 WorkerError::ChainError(chain_error) => chain_error.is_local(),
315 }
316 }
317}
318
319impl From<ChainError> for WorkerError {
320 #[instrument(level = "trace", skip(chain_error))]
321 fn from(chain_error: ChainError) -> Self {
322 match chain_error {
323 ChainError::ExecutionError(execution_error, context) => match *execution_error {
324 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
325 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
326 _ => Self::ChainError(Box::new(ChainError::ExecutionError(
327 execution_error,
328 context,
329 ))),
330 },
331 error => Self::ChainError(Box::new(error)),
332 }
333 }
334}
335
336#[cfg(with_testing)]
337impl WorkerError {
338 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
344 let WorkerError::ChainError(chain_error) = self else {
345 panic!("Expected an `ExecutionError`. Got: {self:#?}");
346 };
347
348 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
349 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
350 };
351
352 assert_eq!(context, expected_context);
353
354 *execution_error
355 }
356}
357
358pub struct WorkerState<StorageClient>
360where
361 StorageClient: Storage,
362{
363 nickname: String,
365 storage: StorageClient,
367 chain_worker_config: ChainWorkerConfig,
369 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
370 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
371 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
373 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
376 chain_worker_tasks: Arc<Mutex<JoinSet>>,
378 chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
380}
381
382impl<StorageClient> Clone for WorkerState<StorageClient>
383where
384 StorageClient: Storage + Clone,
385{
386 fn clone(&self) -> Self {
387 WorkerState {
388 nickname: self.nickname.clone(),
389 storage: self.storage.clone(),
390 chain_worker_config: self.chain_worker_config.clone(),
391 block_cache: self.block_cache.clone(),
392 execution_state_cache: self.execution_state_cache.clone(),
393 tracked_chains: self.tracked_chains.clone(),
394 delivery_notifiers: self.delivery_notifiers.clone(),
395 chain_worker_tasks: self.chain_worker_tasks.clone(),
396 chain_workers: self.chain_workers.clone(),
397 }
398 }
399}
400
401type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
403 ChainWorkerRequest<<StorageClient as Storage>::Context>,
404 tracing::Span,
405 Instant,
406)>;
407
408pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
409
410impl<StorageClient> WorkerState<StorageClient>
411where
412 StorageClient: Storage,
413{
414 #[instrument(level = "trace", skip(nickname, key_pair, storage))]
415 pub fn new(
416 nickname: String,
417 key_pair: Option<ValidatorSecretKey>,
418 storage: StorageClient,
419 block_cache_size: usize,
420 execution_state_cache_size: usize,
421 ) -> Self {
422 WorkerState {
423 nickname,
424 storage,
425 chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
426 block_cache: Arc::new(ValueCache::new(block_cache_size)),
427 execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
428 tracked_chains: None,
429 delivery_notifiers: Arc::default(),
430 chain_worker_tasks: Arc::default(),
431 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
432 }
433 }
434
435 #[instrument(level = "trace", skip(nickname, storage))]
436 pub fn new_for_client(
437 nickname: String,
438 storage: StorageClient,
439 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
440 block_cache_size: usize,
441 execution_state_cache_size: usize,
442 ) -> Self {
443 WorkerState {
444 nickname,
445 storage,
446 chain_worker_config: ChainWorkerConfig::default(),
447 block_cache: Arc::new(ValueCache::new(block_cache_size)),
448 execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
449 tracked_chains: Some(tracked_chains),
450 delivery_notifiers: Arc::default(),
451 chain_worker_tasks: Arc::default(),
452 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
453 }
454 }
455
456 #[instrument(level = "trace", skip(self, value))]
457 pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
458 self.chain_worker_config.allow_inactive_chains = value;
459 self
460 }
461
462 #[instrument(level = "trace", skip(self, value))]
463 pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
464 self.chain_worker_config
465 .allow_messages_from_deprecated_epochs = value;
466 self
467 }
468
469 #[instrument(level = "trace", skip(self, value))]
470 pub fn with_long_lived_services(mut self, value: bool) -> Self {
471 self.chain_worker_config.long_lived_services = value;
472 self
473 }
474
475 #[instrument(level = "trace", skip(self))]
480 pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self {
481 self.chain_worker_config.block_time_grace_period = block_time_grace_period;
482 self
483 }
484
485 #[instrument(level = "trace", skip(self))]
489 pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
490 self.chain_worker_config.ttl = chain_worker_ttl;
491 self
492 }
493
494 #[instrument(level = "trace", skip(self))]
498 pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
499 self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
500 self
501 }
502
503 #[instrument(level = "trace", skip(self))]
507 pub fn with_chain_info_max_received_log_entries(
508 mut self,
509 chain_info_max_received_log_entries: usize,
510 ) -> Self {
511 if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
512 warn!(
513 "The value set for the maximum size of received_log entries \
514 may not be compatible with the latest clients: {} instead of {}",
515 chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
516 );
517 }
518 self.chain_worker_config.chain_info_max_received_log_entries =
519 chain_info_max_received_log_entries;
520 self
521 }
522
523 #[instrument(level = "trace", skip(self))]
524 pub fn nickname(&self) -> &str {
525 &self.nickname
526 }
527
528 #[instrument(level = "trace", skip(self))]
530 #[cfg(not(feature = "test"))]
531 pub(crate) fn storage_client(&self) -> &StorageClient {
532 &self.storage
533 }
534
535 #[instrument(level = "trace", skip(self))]
538 #[cfg(feature = "test")]
539 pub fn storage_client(&self) -> &StorageClient {
540 &self.storage
541 }
542
543 #[instrument(level = "trace", skip(self, certificate))]
544 pub(crate) async fn full_certificate(
545 &self,
546 certificate: LiteCertificate<'_>,
547 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
548 let block = self
549 .block_cache
550 .get(&certificate.value.value_hash)
551 .ok_or(WorkerError::MissingCertificateValue)?;
552
553 match certificate.value.kind {
554 linera_chain::types::CertificateKind::Confirmed => {
555 let value = ConfirmedBlock::from_hashed(block);
556 Ok(Either::Left(
557 certificate
558 .with_value(value)
559 .ok_or(WorkerError::InvalidLiteCertificate)?,
560 ))
561 }
562 linera_chain::types::CertificateKind::Validated => {
563 let value = ValidatedBlock::from_hashed(block);
564 Ok(Either::Right(
565 certificate
566 .with_value(value)
567 .ok_or(WorkerError::InvalidLiteCertificate)?,
568 ))
569 }
570 _ => Err(WorkerError::InvalidLiteCertificate),
571 }
572 }
573}
574
575#[allow(async_fn_in_trait)]
576#[cfg_attr(not(web), trait_variant::make(Send))]
577pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
578 async fn process_certificate<S: Storage + Clone + 'static>(
579 worker: &WorkerState<S>,
580 certificate: GenericCertificate<Self>,
581 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
582}
583
584impl ProcessableCertificate for ConfirmedBlock {
585 async fn process_certificate<S: Storage + Clone + 'static>(
586 worker: &WorkerState<S>,
587 certificate: ConfirmedBlockCertificate,
588 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
589 Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
590 }
591}
592
593impl ProcessableCertificate for ValidatedBlock {
594 async fn process_certificate<S: Storage + Clone + 'static>(
595 worker: &WorkerState<S>,
596 certificate: ValidatedBlockCertificate,
597 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
598 Box::pin(worker.handle_validated_certificate(certificate)).await
599 }
600}
601
602impl ProcessableCertificate for Timeout {
603 async fn process_certificate<S: Storage + Clone + 'static>(
604 worker: &WorkerState<S>,
605 certificate: TimeoutCertificate,
606 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
607 worker.handle_timeout_certificate(certificate).await
608 }
609}
610
611impl<StorageClient> WorkerState<StorageClient>
612where
613 StorageClient: Storage + Clone + 'static,
614{
615 #[instrument(level = "trace", skip(self, certificate, notifier))]
616 #[inline]
617 pub async fn fully_handle_certificate_with_notifications<T>(
618 &self,
619 certificate: GenericCertificate<T>,
620 notifier: &impl Notifier,
621 ) -> Result<ChainInfoResponse, WorkerError>
622 where
623 T: ProcessableCertificate,
624 {
625 let notifications = (*notifier).clone();
626 let this = self.clone();
627 linera_base::task::spawn(async move {
628 let (response, actions) =
629 ProcessableCertificate::process_certificate(&this, certificate).await?;
630 notifications.notify(&actions.notifications);
631 let mut requests = VecDeque::from(actions.cross_chain_requests);
632 while let Some(request) = requests.pop_front() {
633 let actions = this.handle_cross_chain_request(request).await?;
634 requests.extend(actions.cross_chain_requests);
635 notifications.notify(&actions.notifications);
636 }
637 Ok(response)
638 })
639 .await
640 }
641
642 #[instrument(level = "trace", skip(self, block))]
644 pub async fn stage_block_execution(
645 &self,
646 block: ProposedBlock,
647 round: Option<u32>,
648 published_blobs: Vec<Blob>,
649 ) -> Result<(Block, ChainInfoResponse), WorkerError> {
650 self.query_chain_worker(block.chain_id, move |callback| {
651 ChainWorkerRequest::StageBlockExecution {
652 block,
653 round,
654 published_blobs,
655 callback,
656 }
657 })
658 .await
659 }
660
661 #[instrument(level = "trace", skip(self, chain_id, query))]
666 pub async fn query_application(
667 &self,
668 chain_id: ChainId,
669 query: Query,
670 block_hash: Option<CryptoHash>,
671 ) -> Result<QueryOutcome, WorkerError> {
672 self.query_chain_worker(chain_id, move |callback| {
673 ChainWorkerRequest::QueryApplication {
674 query,
675 block_hash,
676 callback,
677 }
678 })
679 .await
680 }
681
682 #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
683 nickname = %self.nickname,
684 chain_id = %chain_id,
685 application_id = %application_id
686 ))]
687 pub async fn describe_application(
688 &self,
689 chain_id: ChainId,
690 application_id: ApplicationId,
691 ) -> Result<ApplicationDescription, WorkerError> {
692 self.query_chain_worker(chain_id, move |callback| {
693 ChainWorkerRequest::DescribeApplication {
694 application_id,
695 callback,
696 }
697 })
698 .await
699 }
700
701 #[instrument(
703 level = "trace",
704 skip(self, certificate, notify_when_messages_are_delivered),
705 fields(
706 nickname = %self.nickname,
707 chain_id = %certificate.block().header.chain_id,
708 block_height = %certificate.block().header.height
709 )
710 )]
711 async fn process_confirmed_block(
712 &self,
713 certificate: ConfirmedBlockCertificate,
714 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
715 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
716 let chain_id = certificate.block().header.chain_id;
717 self.query_chain_worker(chain_id, move |callback| {
718 ChainWorkerRequest::ProcessConfirmedBlock {
719 certificate,
720 notify_when_messages_are_delivered,
721 callback,
722 }
723 })
724 .await
725 }
726
727 #[instrument(level = "trace", skip(self, certificate), fields(
729 nickname = %self.nickname,
730 chain_id = %certificate.block().header.chain_id,
731 block_height = %certificate.block().header.height
732 ))]
733 async fn process_validated_block(
734 &self,
735 certificate: ValidatedBlockCertificate,
736 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
737 let chain_id = certificate.block().header.chain_id;
738 self.query_chain_worker(chain_id, move |callback| {
739 ChainWorkerRequest::ProcessValidatedBlock {
740 certificate,
741 callback,
742 }
743 })
744 .await
745 }
746
747 #[instrument(level = "trace", skip(self, certificate), fields(
749 nickname = %self.nickname,
750 chain_id = %certificate.value().chain_id(),
751 height = %certificate.value().height()
752 ))]
753 async fn process_timeout(
754 &self,
755 certificate: TimeoutCertificate,
756 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
757 let chain_id = certificate.value().chain_id();
758 self.query_chain_worker(chain_id, move |callback| {
759 ChainWorkerRequest::ProcessTimeout {
760 certificate,
761 callback,
762 }
763 })
764 .await
765 }
766
767 #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
768 nickname = %self.nickname,
769 origin = %origin,
770 recipient = %recipient,
771 num_bundles = %bundles.len()
772 ))]
773 async fn process_cross_chain_update(
774 &self,
775 origin: ChainId,
776 recipient: ChainId,
777 bundles: Vec<(Epoch, MessageBundle)>,
778 ) -> Result<Option<BlockHeight>, WorkerError> {
779 self.query_chain_worker(recipient, move |callback| {
780 ChainWorkerRequest::ProcessCrossChainUpdate {
781 origin,
782 bundles,
783 callback,
784 }
785 })
786 .await
787 }
788
789 #[instrument(level = "trace", skip(self, chain_id, height), fields(
791 nickname = %self.nickname,
792 chain_id = %chain_id,
793 height = %height
794 ))]
795 #[cfg(with_testing)]
796 pub async fn read_certificate(
797 &self,
798 chain_id: ChainId,
799 height: BlockHeight,
800 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
801 self.query_chain_worker(chain_id, move |callback| {
802 ChainWorkerRequest::ReadCertificate { height, callback }
803 })
804 .await
805 }
806
807 #[instrument(level = "trace", skip(self), fields(
813 nickname = %self.nickname,
814 chain_id = %chain_id
815 ))]
816 pub async fn chain_state_view(
817 &self,
818 chain_id: ChainId,
819 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
820 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
821 callback,
822 })
823 .await
824 }
825
826 #[instrument(level = "trace", skip(self, request_builder), fields(
828 nickname = %self.nickname,
829 chain_id = %chain_id
830 ))]
831 async fn query_chain_worker<Response>(
832 &self,
833 chain_id: ChainId,
834 request_builder: impl FnOnce(
835 oneshot::Sender<Result<Response, WorkerError>>,
836 ) -> ChainWorkerRequest<StorageClient::Context>,
837 ) -> Result<Response, WorkerError> {
838 let (callback, response) = oneshot::channel();
840 let request = request_builder(callback);
841
842 let new_receiver = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
844
845 if let Some(receiver) = new_receiver {
847 let delivery_notifier = self
848 .delivery_notifiers
849 .lock()
850 .unwrap()
851 .entry(chain_id)
852 .or_default()
853 .clone();
854
855 let is_tracked = self
856 .tracked_chains
857 .as_ref()
858 .is_some_and(|tracked_chains| tracked_chains.read().unwrap().contains(&chain_id));
859
860 let actor_task = ChainWorkerActor::run(
861 self.chain_worker_config.clone(),
862 self.storage.clone(),
863 self.block_cache.clone(),
864 self.execution_state_cache.clone(),
865 self.tracked_chains.clone(),
866 delivery_notifier,
867 chain_id,
868 receiver,
869 is_tracked,
870 );
871
872 self.chain_worker_tasks
873 .lock()
874 .unwrap()
875 .spawn_task(actor_task);
876 }
877
878 match response.await {
880 Err(e) => {
881 Err(WorkerError::ChainActorRecvError {
883 chain_id,
884 error: Box::new(e),
885 })
886 }
887 Ok(response) => response,
888 }
889 }
890
891 #[instrument(level = "trace", skip(self), fields(
893 nickname = %self.nickname,
894 chain_id = %chain_id
895 ))]
896 #[expect(clippy::type_complexity)]
897 fn call_and_maybe_create_chain_worker_endpoint(
898 &self,
899 chain_id: ChainId,
900 request: ChainWorkerRequest<StorageClient::Context>,
901 ) -> Result<
902 Option<
903 mpsc::UnboundedReceiver<(
904 ChainWorkerRequest<StorageClient::Context>,
905 tracing::Span,
906 Instant,
907 )>,
908 >,
909 WorkerError,
910 > {
911 let mut chain_workers = self.chain_workers.lock().unwrap();
912
913 let (sender, new_receiver) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
914 (endpoint, None)
915 } else {
916 let (sender, receiver) = mpsc::unbounded_channel();
917 (sender, Some(receiver))
918 };
919
920 if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) {
921 return Err(WorkerError::ChainActorSendError {
923 chain_id,
924 error: Box::new(e),
925 });
926 }
927
928 chain_workers.insert(chain_id, sender);
930
931 Ok(new_receiver)
932 }
933
934 #[instrument(skip_all, fields(
935 nick = self.nickname,
936 chain_id = format!("{:.8}", proposal.content.block.chain_id),
937 height = %proposal.content.block.height,
938 ))]
939 pub async fn handle_block_proposal(
940 &self,
941 proposal: BlockProposal,
942 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
943 trace!("{} <-- {:?}", self.nickname, proposal);
944 #[cfg(with_metrics)]
945 let round = proposal.content.round;
946 let response = self
947 .query_chain_worker(proposal.content.block.chain_id, move |callback| {
948 ChainWorkerRequest::HandleBlockProposal { proposal, callback }
949 })
950 .await?;
951 #[cfg(with_metrics)]
952 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
953 .with_label_values(&[round.type_name()])
954 .observe(round.number() as f64);
955 Ok(response)
956 }
957
958 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
961 pub async fn handle_lite_certificate(
962 &self,
963 certificate: LiteCertificate<'_>,
964 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
965 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
966 match self.full_certificate(certificate).await? {
967 Either::Left(confirmed) => {
968 Box::pin(
969 self.handle_confirmed_certificate(
970 confirmed,
971 notify_when_messages_are_delivered,
972 ),
973 )
974 .await
975 }
976 Either::Right(validated) => {
977 if let Some(notifier) = notify_when_messages_are_delivered {
978 if let Err(()) = notifier.send(()) {
980 warn!("Failed to notify message delivery to caller");
981 }
982 }
983 Box::pin(self.handle_validated_certificate(validated)).await
984 }
985 }
986 }
987
988 #[instrument(skip_all, fields(
990 nick = self.nickname,
991 chain_id = format!("{:.8}", certificate.block().header.chain_id),
992 height = %certificate.block().header.height,
993 ))]
994 pub async fn handle_confirmed_certificate(
995 &self,
996 certificate: ConfirmedBlockCertificate,
997 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
998 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
999 trace!("{} <-- {:?}", self.nickname, certificate);
1000 #[cfg(with_metrics)]
1001 let metrics_data = (
1002 certificate.inner().to_log_str(),
1003 certificate.round.type_name(),
1004 certificate.round.number(),
1005 certificate.block().body.transactions.len() as u64,
1006 certificate.block().body.incoming_bundles().count() as u64,
1007 certificate.block().body.operations().count() as u64,
1008 certificate
1009 .signatures()
1010 .iter()
1011 .map(|(validator_name, _)| validator_name.to_string())
1012 .collect::<Vec<_>>(),
1013 );
1014
1015 let (info, actions, _outcome) =
1016 Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1017 .await?;
1018
1019 #[cfg(with_metrics)]
1020 {
1021 if matches!(_outcome, BlockOutcome::Processed) {
1022 let (
1023 certificate_log_str,
1024 round_type,
1025 round_number,
1026 confirmed_transactions,
1027 confirmed_incoming_bundles,
1028 confirmed_operations,
1029 validators_with_signatures,
1030 ) = metrics_data;
1031 metrics::NUM_BLOCKS.with_label_values(&[]).inc();
1032 metrics::NUM_ROUNDS_IN_CERTIFICATE
1033 .with_label_values(&[certificate_log_str, round_type])
1034 .observe(round_number as f64);
1035 if confirmed_transactions > 0 {
1036 metrics::TRANSACTION_COUNT
1037 .with_label_values(&[])
1038 .inc_by(confirmed_transactions);
1039 if confirmed_incoming_bundles > 0 {
1040 metrics::INCOMING_BUNDLE_COUNT.inc_by(confirmed_incoming_bundles);
1041 }
1042 if confirmed_operations > 0 {
1043 metrics::OPERATION_COUNT.inc_by(confirmed_operations);
1044 }
1045 }
1046
1047 for validator_name in validators_with_signatures {
1048 metrics::CERTIFICATES_SIGNED
1049 .with_label_values(&[&validator_name])
1050 .inc();
1051 }
1052 }
1053 }
1054 Ok((info, actions))
1055 }
1056
1057 #[instrument(skip_all, fields(
1059 nick = self.nickname,
1060 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1061 height = %certificate.block().header.height,
1062 ))]
1063 pub async fn handle_validated_certificate(
1064 &self,
1065 certificate: ValidatedBlockCertificate,
1066 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1067 trace!("{} <-- {:?}", self.nickname, certificate);
1068
1069 #[cfg(with_metrics)]
1070 let round = certificate.round;
1071 #[cfg(with_metrics)]
1072 let cert_str = certificate.inner().to_log_str();
1073
1074 let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1075 #[cfg(with_metrics)]
1076 {
1077 if matches!(_outcome, BlockOutcome::Processed) {
1078 metrics::NUM_ROUNDS_IN_CERTIFICATE
1079 .with_label_values(&[cert_str, round.type_name()])
1080 .observe(round.number() as f64);
1081 }
1082 }
1083 Ok((info, actions))
1084 }
1085
1086 #[instrument(skip_all, fields(
1088 nick = self.nickname,
1089 chain_id = format!("{:.8}", certificate.inner().chain_id()),
1090 height = %certificate.inner().height(),
1091 ))]
1092 pub async fn handle_timeout_certificate(
1093 &self,
1094 certificate: TimeoutCertificate,
1095 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1096 trace!("{} <-- {:?}", self.nickname, certificate);
1097 self.process_timeout(certificate).await
1098 }
1099
1100 #[instrument(skip_all, fields(
1101 nick = self.nickname,
1102 chain_id = format!("{:.8}", query.chain_id)
1103 ))]
1104 pub async fn handle_chain_info_query(
1105 &self,
1106 query: ChainInfoQuery,
1107 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1108 trace!("{} <-- {:?}", self.nickname, query);
1109 #[cfg(with_metrics)]
1110 metrics::CHAIN_INFO_QUERIES.inc();
1111 let result = self
1112 .query_chain_worker(query.chain_id, move |callback| {
1113 ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1114 })
1115 .await;
1116 trace!("{} --> {:?}", self.nickname, result);
1117 result
1118 }
1119
1120 #[instrument(skip_all, fields(
1121 nick = self.nickname,
1122 chain_id = format!("{:.8}", chain_id)
1123 ))]
1124 pub async fn download_pending_blob(
1125 &self,
1126 chain_id: ChainId,
1127 blob_id: BlobId,
1128 ) -> Result<Blob, WorkerError> {
1129 trace!(
1130 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1131 self.nickname
1132 );
1133 let result = self
1134 .query_chain_worker(chain_id, move |callback| {
1135 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1136 })
1137 .await;
1138 trace!(
1139 "{} --> {:?}",
1140 self.nickname,
1141 result.as_ref().map(|_| blob_id)
1142 );
1143 result
1144 }
1145
1146 #[instrument(skip_all, fields(
1147 nick = self.nickname,
1148 chain_id = format!("{:.8}", chain_id)
1149 ))]
1150 pub async fn handle_pending_blob(
1151 &self,
1152 chain_id: ChainId,
1153 blob: Blob,
1154 ) -> Result<ChainInfoResponse, WorkerError> {
1155 let blob_id = blob.id();
1156 trace!(
1157 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1158 self.nickname
1159 );
1160 let result = self
1161 .query_chain_worker(chain_id, move |callback| {
1162 ChainWorkerRequest::HandlePendingBlob { blob, callback }
1163 })
1164 .await;
1165 trace!(
1166 "{} --> {:?}",
1167 self.nickname,
1168 result.as_ref().map(|_| blob_id)
1169 );
1170 result
1171 }
1172
1173 #[instrument(skip_all, fields(
1174 nick = self.nickname,
1175 chain_id = format!("{:.8}", request.target_chain_id())
1176 ))]
1177 pub async fn handle_cross_chain_request(
1178 &self,
1179 request: CrossChainRequest,
1180 ) -> Result<NetworkActions, WorkerError> {
1181 trace!("{} <-- {:?}", self.nickname, request);
1182 match request {
1183 CrossChainRequest::UpdateRecipient {
1184 sender,
1185 recipient,
1186 bundles,
1187 } => {
1188 let mut actions = NetworkActions::default();
1189 let origin = sender;
1190 let Some(height) = self
1191 .process_cross_chain_update(origin, recipient, bundles)
1192 .await?
1193 else {
1194 return Ok(actions);
1195 };
1196 actions.notifications.push(Notification {
1197 chain_id: recipient,
1198 reason: Reason::NewIncomingBundle { origin, height },
1199 });
1200 actions
1201 .cross_chain_requests
1202 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1203 sender,
1204 recipient,
1205 latest_height: height,
1206 });
1207 Ok(actions)
1208 }
1209 CrossChainRequest::ConfirmUpdatedRecipient {
1210 sender,
1211 recipient,
1212 latest_height,
1213 } => {
1214 self.query_chain_worker(sender, move |callback| {
1215 ChainWorkerRequest::ConfirmUpdatedRecipient {
1216 recipient,
1217 latest_height,
1218 callback,
1219 }
1220 })
1221 .await?;
1222 Ok(NetworkActions::default())
1223 }
1224 }
1225 }
1226
1227 #[instrument(skip_all, fields(
1229 nickname = %self.nickname,
1230 chain_id = %chain_id,
1231 num_trackers = %new_trackers.len()
1232 ))]
1233 pub async fn update_received_certificate_trackers(
1234 &self,
1235 chain_id: ChainId,
1236 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1237 ) -> Result<(), WorkerError> {
1238 self.query_chain_worker(chain_id, move |callback| {
1239 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1240 new_trackers,
1241 callback,
1242 }
1243 })
1244 .await
1245 }
1246
1247 #[instrument(skip_all, fields(
1249 nickname = %self.nickname,
1250 chain_id = %chain_id,
1251 start = %start,
1252 end = %end
1253 ))]
1254 pub async fn get_preprocessed_block_hashes(
1255 &self,
1256 chain_id: ChainId,
1257 start: BlockHeight,
1258 end: BlockHeight,
1259 ) -> Result<Vec<CryptoHash>, WorkerError> {
1260 self.query_chain_worker(chain_id, move |callback| {
1261 ChainWorkerRequest::GetPreprocessedBlockHashes {
1262 start,
1263 end,
1264 callback,
1265 }
1266 })
1267 .await
1268 }
1269
1270 #[instrument(skip_all, fields(
1272 nickname = %self.nickname,
1273 chain_id = %chain_id,
1274 origin = %origin
1275 ))]
1276 pub async fn get_inbox_next_height(
1277 &self,
1278 chain_id: ChainId,
1279 origin: ChainId,
1280 ) -> Result<BlockHeight, WorkerError> {
1281 self.query_chain_worker(chain_id, move |callback| {
1282 ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1283 })
1284 .await
1285 }
1286
1287 #[instrument(skip_all, fields(
1290 nickname = %self.nickname,
1291 chain_id = %chain_id,
1292 num_blob_ids = %blob_ids.len()
1293 ))]
1294 pub async fn get_locking_blobs(
1295 &self,
1296 chain_id: ChainId,
1297 blob_ids: Vec<BlobId>,
1298 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1299 self.query_chain_worker(chain_id, move |callback| {
1300 ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1301 })
1302 .await
1303 }
1304
1305 pub async fn get_block_hashes(
1307 &self,
1308 chain_id: ChainId,
1309 heights: Vec<BlockHeight>,
1310 ) -> Result<Vec<CryptoHash>, WorkerError> {
1311 self.query_chain_worker(chain_id, move |callback| {
1312 ChainWorkerRequest::GetBlockHashes { heights, callback }
1313 })
1314 .await
1315 }
1316
1317 pub async fn get_proposed_blobs(
1319 &self,
1320 chain_id: ChainId,
1321 blob_ids: Vec<BlobId>,
1322 ) -> Result<Vec<Blob>, WorkerError> {
1323 self.query_chain_worker(chain_id, move |callback| {
1324 ChainWorkerRequest::GetProposedBlobs { blob_ids, callback }
1325 })
1326 .await
1327 }
1328
1329 pub async fn get_event_subscriptions(
1331 &self,
1332 chain_id: ChainId,
1333 ) -> Result<EventSubscriptionsResult, WorkerError> {
1334 self.query_chain_worker(chain_id, |callback| {
1335 ChainWorkerRequest::GetEventSubscriptions { callback }
1336 })
1337 .await
1338 }
1339
1340 pub async fn get_next_expected_event(
1342 &self,
1343 chain_id: ChainId,
1344 stream_id: StreamId,
1345 ) -> Result<Option<u32>, WorkerError> {
1346 self.query_chain_worker(chain_id, move |callback| {
1347 ChainWorkerRequest::GetNextExpectedEvent {
1348 stream_id,
1349 callback,
1350 }
1351 })
1352 .await
1353 }
1354
1355 pub async fn get_received_certificate_trackers(
1357 &self,
1358 chain_id: ChainId,
1359 ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1360 self.query_chain_worker(chain_id, |callback| {
1361 ChainWorkerRequest::GetReceivedCertificateTrackers { callback }
1362 })
1363 .await
1364 }
1365
1366 pub async fn get_tip_state_and_outbox_info(
1368 &self,
1369 chain_id: ChainId,
1370 receiver_id: ChainId,
1371 ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1372 self.query_chain_worker(chain_id, move |callback| {
1373 ChainWorkerRequest::GetTipStateAndOutboxInfo {
1374 receiver_id,
1375 callback,
1376 }
1377 })
1378 .await
1379 }
1380
1381 pub async fn get_next_height_to_preprocess(
1383 &self,
1384 chain_id: ChainId,
1385 ) -> Result<BlockHeight, WorkerError> {
1386 self.query_chain_worker(chain_id, |callback| {
1387 ChainWorkerRequest::GetNextHeightToPreprocess { callback }
1388 })
1389 .await
1390 }
1391}
1392
1393#[cfg(with_testing)]
1394impl<StorageClient> WorkerState<StorageClient>
1395where
1396 StorageClient: Storage,
1397{
1398 #[instrument(level = "trace", skip(self))]
1404 pub fn public_key(&self) -> ValidatorPublicKey {
1405 self.chain_worker_config
1406 .key_pair()
1407 .expect(
1408 "Test validator should have a key pair assigned to it \
1409 in order to obtain it's public key",
1410 )
1411 .public()
1412 }
1413}