1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14 data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round},
15 doc_scalar,
16 hashed::Hashed,
17 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
18};
19#[cfg(with_testing)]
20use linera_chain::ChainExecutionContext;
21use linera_chain::{
22 data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
23 types::{
24 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
25 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
26 },
27 ChainError, ChainStateView,
28};
29use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
30use linera_storage::Storage;
31use linera_views::{context::InactiveContext, ViewError};
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
35use tracing::{error, instrument, trace, warn};
36
37use crate::{
38 chain_worker::{
39 BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
40 },
41 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
42 join_set_ext::{JoinSet, JoinSetExt},
43 notifier::Notifier,
44 value_cache::ValueCache,
45 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
46};
47
48#[cfg(test)]
49#[path = "unit_tests/worker_tests.rs"]
50mod worker_tests;
51
52#[cfg(with_metrics)]
53mod metrics {
54 use std::sync::LazyLock;
55
56 use linera_base::prometheus_util::{
57 exponential_bucket_interval, register_histogram_vec, register_int_counter,
58 register_int_counter_vec,
59 };
60 use prometheus::{HistogramVec, IntCounter, IntCounterVec};
61
62 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
63 register_histogram_vec(
64 "num_rounds_in_certificate",
65 "Number of rounds in certificate",
66 &["certificate_value", "round_type"],
67 exponential_bucket_interval(0.1, 50.0),
68 )
69 });
70
71 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
72 register_histogram_vec(
73 "num_rounds_in_block_proposal",
74 "Number of rounds in block proposal",
75 &["round_type"],
76 exponential_bucket_interval(0.1, 50.0),
77 )
78 });
79
80 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
81 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
82
83 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
84 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
85 });
86
87 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
88 register_int_counter_vec(
89 "certificates_signed",
90 "Number of confirmed block certificates signed by each validator",
91 &["validator_name"],
92 )
93 });
94
95 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
96 register_int_counter(
97 "chain_info_queries",
98 "Number of chain info queries processed",
99 )
100 });
101}
102
103#[derive(Default, Debug)]
105pub struct NetworkActions {
106 pub cross_chain_requests: Vec<CrossChainRequest>,
108 pub notifications: Vec<Notification>,
110}
111
112impl NetworkActions {
113 pub fn extend(&mut self, other: NetworkActions) {
114 self.cross_chain_requests.extend(other.cross_chain_requests);
115 self.notifications.extend(other.notifications);
116 }
117}
118
119#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
120pub struct Notification {
122 pub chain_id: ChainId,
123 pub reason: Reason,
124}
125
126doc_scalar!(
127 Notification,
128 "Notify that a chain has a new certified block or a new message"
129);
130
131#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
132pub enum Reason {
134 NewBlock {
135 height: BlockHeight,
136 hash: CryptoHash,
137 },
138 NewEvents {
139 height: BlockHeight,
140 hash: CryptoHash,
141 event_streams: BTreeSet<StreamId>,
142 },
143 NewIncomingBundle {
144 origin: ChainId,
145 height: BlockHeight,
146 },
147 NewRound {
148 height: BlockHeight,
149 round: Round,
150 },
151}
152
153#[derive(Debug, Error)]
155pub enum WorkerError {
156 #[error(transparent)]
157 CryptoError(#[from] CryptoError),
158
159 #[error(transparent)]
160 ArithmeticError(#[from] ArithmeticError),
161
162 #[error(transparent)]
163 ViewError(#[from] ViewError),
164
165 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
166 ReadCertificatesError(Vec<CryptoHash>),
167
168 #[error(transparent)]
169 ChainError(#[from] Box<ChainError>),
170
171 #[error("Block was not signed by an authorized owner")]
173 InvalidOwner,
174
175 #[error("Operations in the block are not authenticated by the proper owner: {0}")]
176 InvalidSigner(AccountOwner),
177
178 #[error(
180 "Chain is expecting a next block at height {expected_block_height} but the given block \
181 is at height {found_block_height} instead"
182 )]
183 UnexpectedBlockHeight {
184 expected_block_height: BlockHeight,
185 found_block_height: BlockHeight,
186 },
187 #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
188 InvalidEpoch {
189 chain_id: ChainId,
190 chain_epoch: Epoch,
191 epoch: Epoch,
192 },
193
194 #[error("Events not found: {0:?}")]
195 EventsNotFound(Vec<EventId>),
196
197 #[error("Invalid cross-chain request")]
199 InvalidCrossChainRequest,
200 #[error("The block does not contain the hash that we expected for the previous block")]
201 InvalidBlockChaining,
202 #[error(
203 "The given outcome is not what we computed after executing the block.\n\
204 Computed: {computed:#?}\n\
205 Submitted: {submitted:#?}"
206 )]
207 IncorrectOutcome {
208 computed: Box<BlockExecutionOutcome>,
209 submitted: Box<BlockExecutionOutcome>,
210 },
211 #[error("The block timestamp is in the future.")]
212 InvalidTimestamp,
213 #[error("We don't have the value for the certificate.")]
214 MissingCertificateValue,
215 #[error("The hash certificate doesn't match its value.")]
216 InvalidLiteCertificate,
217 #[error("Fast blocks cannot query oracles")]
218 FastBlockUsingOracles,
219 #[error("Blobs not found: {0:?}")]
220 BlobsNotFound(Vec<BlobId>),
221 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
222 ConfirmedLogEntryNotFound {
223 height: BlockHeight,
224 chain_id: ChainId,
225 },
226 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
227 PreprocessedBlocksEntryNotFound {
228 height: BlockHeight,
229 chain_id: ChainId,
230 },
231 #[error("The block proposal is invalid: {0}")]
232 InvalidBlockProposal(String),
233 #[error("Failed to join spawned worker task")]
234 JoinError,
235 #[error("Blob was not required by any pending block")]
236 UnexpectedBlob,
237 #[error("Number of published blobs per block must not exceed {0}")]
238 TooManyPublishedBlobs(u64),
239 #[error("Missing network description")]
240 MissingNetworkDescription,
241 #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
242 ChainActorSendError {
243 chain_id: ChainId,
244 error: Box<dyn std::error::Error + Send + Sync>,
245 },
246 #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
247 ChainActorRecvError {
248 chain_id: ChainId,
249 error: Box<dyn std::error::Error + Send + Sync>,
250 },
251}
252
253impl WorkerError {
254 pub fn is_local(&self) -> bool {
258 match self {
259 WorkerError::CryptoError(_)
260 | WorkerError::ArithmeticError(_)
261 | WorkerError::InvalidOwner
262 | WorkerError::InvalidSigner(_)
263 | WorkerError::UnexpectedBlockHeight { .. }
264 | WorkerError::InvalidEpoch { .. }
265 | WorkerError::EventsNotFound(_)
266 | WorkerError::InvalidBlockChaining
267 | WorkerError::IncorrectOutcome { .. }
268 | WorkerError::InvalidTimestamp
269 | WorkerError::MissingCertificateValue
270 | WorkerError::InvalidLiteCertificate
271 | WorkerError::FastBlockUsingOracles
272 | WorkerError::BlobsNotFound(_)
273 | WorkerError::InvalidBlockProposal(_)
274 | WorkerError::UnexpectedBlob
275 | WorkerError::TooManyPublishedBlobs(_)
276 | WorkerError::ViewError(ViewError::NotFound(_)) => false,
277 WorkerError::InvalidCrossChainRequest
278 | WorkerError::ViewError(_)
279 | WorkerError::ConfirmedLogEntryNotFound { .. }
280 | WorkerError::PreprocessedBlocksEntryNotFound { .. }
281 | WorkerError::JoinError
282 | WorkerError::MissingNetworkDescription
283 | WorkerError::ChainActorSendError { .. }
284 | WorkerError::ChainActorRecvError { .. }
285 | WorkerError::ReadCertificatesError(_) => true,
286 WorkerError::ChainError(chain_error) => chain_error.is_local(),
287 }
288 }
289}
290
291impl From<ChainError> for WorkerError {
292 #[instrument(level = "trace", skip(chain_error))]
293 fn from(chain_error: ChainError) -> Self {
294 match chain_error {
295 ChainError::ExecutionError(execution_error, context) => {
296 if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
297 Self::BlobsNotFound(blob_ids)
298 } else {
299 Self::ChainError(Box::new(ChainError::ExecutionError(
300 execution_error,
301 context,
302 )))
303 }
304 }
305 error => Self::ChainError(Box::new(error)),
306 }
307 }
308}
309
310#[cfg(with_testing)]
311impl WorkerError {
312 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
318 let WorkerError::ChainError(chain_error) = self else {
319 panic!("Expected an `ExecutionError`. Got: {self:#?}");
320 };
321
322 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
323 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
324 };
325
326 assert_eq!(context, expected_context);
327
328 *execution_error
329 }
330}
331
332pub struct WorkerState<StorageClient>
334where
335 StorageClient: Storage,
336{
337 nickname: String,
339 storage: StorageClient,
341 chain_worker_config: ChainWorkerConfig,
343 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
344 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
345 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
347 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
350 chain_worker_tasks: Arc<Mutex<JoinSet>>,
352 chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
354}
355
356impl<StorageClient> Clone for WorkerState<StorageClient>
357where
358 StorageClient: Storage + Clone,
359{
360 fn clone(&self) -> Self {
361 WorkerState {
362 nickname: self.nickname.clone(),
363 storage: self.storage.clone(),
364 chain_worker_config: self.chain_worker_config.clone(),
365 block_cache: self.block_cache.clone(),
366 execution_state_cache: self.execution_state_cache.clone(),
367 tracked_chains: self.tracked_chains.clone(),
368 delivery_notifiers: self.delivery_notifiers.clone(),
369 chain_worker_tasks: self.chain_worker_tasks.clone(),
370 chain_workers: self.chain_workers.clone(),
371 }
372 }
373}
374
375type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
377 ChainWorkerRequest<<StorageClient as Storage>::Context>,
378 tracing::Span,
379)>;
380
381pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
382
383impl<StorageClient> WorkerState<StorageClient>
384where
385 StorageClient: Storage,
386{
387 #[instrument(level = "trace", skip(nickname, key_pair, storage))]
388 pub fn new(
389 nickname: String,
390 key_pair: Option<ValidatorSecretKey>,
391 storage: StorageClient,
392 block_cache_size: usize,
393 execution_state_cache_size: usize,
394 ) -> Self {
395 WorkerState {
396 nickname,
397 storage,
398 chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
399 block_cache: Arc::new(ValueCache::new(block_cache_size)),
400 execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
401 tracked_chains: None,
402 delivery_notifiers: Arc::default(),
403 chain_worker_tasks: Arc::default(),
404 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
405 }
406 }
407
408 #[instrument(level = "trace", skip(nickname, storage))]
409 pub fn new_for_client(
410 nickname: String,
411 storage: StorageClient,
412 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
413 block_cache_size: usize,
414 execution_state_cache_size: usize,
415 ) -> Self {
416 WorkerState {
417 nickname,
418 storage,
419 chain_worker_config: ChainWorkerConfig::default(),
420 block_cache: Arc::new(ValueCache::new(block_cache_size)),
421 execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
422 tracked_chains: Some(tracked_chains),
423 delivery_notifiers: Arc::default(),
424 chain_worker_tasks: Arc::default(),
425 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
426 }
427 }
428
429 #[instrument(level = "trace", skip(self, value))]
430 pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
431 self.chain_worker_config.allow_inactive_chains = value;
432 self
433 }
434
435 #[instrument(level = "trace", skip(self, value))]
436 pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
437 self.chain_worker_config
438 .allow_messages_from_deprecated_epochs = value;
439 self
440 }
441
442 #[instrument(level = "trace", skip(self, value))]
443 pub fn with_long_lived_services(mut self, value: bool) -> Self {
444 self.chain_worker_config.long_lived_services = value;
445 self
446 }
447
448 #[instrument(level = "trace", skip(self))]
453 pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
454 self.chain_worker_config.grace_period = grace_period;
455 self
456 }
457
458 #[instrument(level = "trace", skip(self))]
462 pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
463 self.chain_worker_config.ttl = chain_worker_ttl;
464 self
465 }
466
467 #[instrument(level = "trace", skip(self))]
471 pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
472 self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
473 self
474 }
475
476 #[instrument(level = "trace", skip(self))]
480 pub fn with_chain_info_max_received_log_entries(
481 mut self,
482 chain_info_max_received_log_entries: usize,
483 ) -> Self {
484 if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
485 warn!(
486 "The value set for the maximum size of received_log entries \
487 may not be compatible with the latest clients: {} instead of {}",
488 chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
489 );
490 }
491 self.chain_worker_config.chain_info_max_received_log_entries =
492 chain_info_max_received_log_entries;
493 self
494 }
495
496 #[instrument(level = "trace", skip(self))]
497 pub fn nickname(&self) -> &str {
498 &self.nickname
499 }
500
501 #[instrument(level = "trace", skip(self))]
503 #[cfg(not(feature = "test"))]
504 pub(crate) fn storage_client(&self) -> &StorageClient {
505 &self.storage
506 }
507
508 #[instrument(level = "trace", skip(self))]
511 #[cfg(feature = "test")]
512 pub fn storage_client(&self) -> &StorageClient {
513 &self.storage
514 }
515
516 #[instrument(level = "trace", skip(self, certificate))]
517 pub(crate) async fn full_certificate(
518 &self,
519 certificate: LiteCertificate<'_>,
520 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
521 let block = self
522 .block_cache
523 .get(&certificate.value.value_hash)
524 .ok_or(WorkerError::MissingCertificateValue)?;
525
526 match certificate.value.kind {
527 linera_chain::types::CertificateKind::Confirmed => {
528 let value = ConfirmedBlock::from_hashed(block);
529 Ok(Either::Left(
530 certificate
531 .with_value(value)
532 .ok_or(WorkerError::InvalidLiteCertificate)?,
533 ))
534 }
535 linera_chain::types::CertificateKind::Validated => {
536 let value = ValidatedBlock::from_hashed(block);
537 Ok(Either::Right(
538 certificate
539 .with_value(value)
540 .ok_or(WorkerError::InvalidLiteCertificate)?,
541 ))
542 }
543 _ => Err(WorkerError::InvalidLiteCertificate),
544 }
545 }
546}
547
548#[allow(async_fn_in_trait)]
549#[cfg_attr(not(web), trait_variant::make(Send))]
550pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
551 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
552 worker: &WorkerState<S>,
553 certificate: GenericCertificate<Self>,
554 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
555}
556
557impl ProcessableCertificate for ConfirmedBlock {
558 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
559 worker: &WorkerState<S>,
560 certificate: ConfirmedBlockCertificate,
561 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
562 worker.handle_confirmed_certificate(certificate, None).await
563 }
564}
565
566impl ProcessableCertificate for ValidatedBlock {
567 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
568 worker: &WorkerState<S>,
569 certificate: ValidatedBlockCertificate,
570 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
571 worker.handle_validated_certificate(certificate).await
572 }
573}
574
575impl ProcessableCertificate for Timeout {
576 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
577 worker: &WorkerState<S>,
578 certificate: TimeoutCertificate,
579 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
580 worker.handle_timeout_certificate(certificate).await
581 }
582}
583
584impl<StorageClient> WorkerState<StorageClient>
585where
586 StorageClient: Storage + Clone + Send + Sync + 'static,
587{
588 #[instrument(level = "trace", skip(self, certificate, notifier))]
589 #[inline]
590 pub async fn fully_handle_certificate_with_notifications<T>(
591 &self,
592 certificate: GenericCertificate<T>,
593 notifier: &impl Notifier,
594 ) -> Result<ChainInfoResponse, WorkerError>
595 where
596 T: ProcessableCertificate,
597 {
598 let notifications = (*notifier).clone();
599 let this = self.clone();
600 linera_base::task::spawn(async move {
601 let (response, actions) =
602 ProcessableCertificate::process_certificate(&this, certificate).await?;
603 notifications.notify(&actions.notifications);
604 let mut requests = VecDeque::from(actions.cross_chain_requests);
605 while let Some(request) = requests.pop_front() {
606 let actions = this.handle_cross_chain_request(request).await?;
607 requests.extend(actions.cross_chain_requests);
608 notifications.notify(&actions.notifications);
609 }
610 Ok(response)
611 })
612 .await
613 .unwrap_or_else(|_| Err(WorkerError::JoinError))
614 }
615
616 #[instrument(level = "trace", skip(self, block))]
618 pub async fn stage_block_execution(
619 &self,
620 block: ProposedBlock,
621 round: Option<u32>,
622 published_blobs: Vec<Blob>,
623 ) -> Result<(Block, ChainInfoResponse), WorkerError> {
624 self.query_chain_worker(block.chain_id, move |callback| {
625 ChainWorkerRequest::StageBlockExecution {
626 block,
627 round,
628 published_blobs,
629 callback,
630 }
631 })
632 .await
633 }
634
635 #[instrument(level = "trace", skip(self, chain_id, query))]
637 pub async fn query_application(
638 &self,
639 chain_id: ChainId,
640 query: Query,
641 ) -> Result<QueryOutcome, WorkerError> {
642 self.query_chain_worker(chain_id, move |callback| {
643 ChainWorkerRequest::QueryApplication { query, callback }
644 })
645 .await
646 }
647
648 #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
649 nickname = %self.nickname,
650 chain_id = %chain_id,
651 application_id = %application_id
652 ))]
653 pub async fn describe_application(
654 &self,
655 chain_id: ChainId,
656 application_id: ApplicationId,
657 ) -> Result<ApplicationDescription, WorkerError> {
658 self.query_chain_worker(chain_id, move |callback| {
659 ChainWorkerRequest::DescribeApplication {
660 application_id,
661 callback,
662 }
663 })
664 .await
665 }
666
667 #[instrument(
669 level = "trace",
670 skip(self, certificate, notify_when_messages_are_delivered),
671 fields(
672 nickname = %self.nickname,
673 chain_id = %certificate.block().header.chain_id,
674 block_height = %certificate.block().header.height
675 )
676 )]
677 async fn process_confirmed_block(
678 &self,
679 certificate: ConfirmedBlockCertificate,
680 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
681 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
682 let chain_id = certificate.block().header.chain_id;
683 self.query_chain_worker(chain_id, move |callback| {
684 ChainWorkerRequest::ProcessConfirmedBlock {
685 certificate,
686 notify_when_messages_are_delivered,
687 callback,
688 }
689 })
690 .await
691 }
692
693 #[instrument(level = "trace", skip(self, certificate), fields(
695 nickname = %self.nickname,
696 chain_id = %certificate.block().header.chain_id,
697 block_height = %certificate.block().header.height
698 ))]
699 async fn process_validated_block(
700 &self,
701 certificate: ValidatedBlockCertificate,
702 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
703 let chain_id = certificate.block().header.chain_id;
704 self.query_chain_worker(chain_id, move |callback| {
705 ChainWorkerRequest::ProcessValidatedBlock {
706 certificate,
707 callback,
708 }
709 })
710 .await
711 }
712
713 #[instrument(level = "trace", skip(self, certificate), fields(
715 nickname = %self.nickname,
716 chain_id = %certificate.value().chain_id(),
717 height = %certificate.value().height()
718 ))]
719 async fn process_timeout(
720 &self,
721 certificate: TimeoutCertificate,
722 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
723 let chain_id = certificate.value().chain_id();
724 self.query_chain_worker(chain_id, move |callback| {
725 ChainWorkerRequest::ProcessTimeout {
726 certificate,
727 callback,
728 }
729 })
730 .await
731 }
732
733 #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
734 nickname = %self.nickname,
735 origin = %origin,
736 recipient = %recipient,
737 num_bundles = %bundles.len()
738 ))]
739 async fn process_cross_chain_update(
740 &self,
741 origin: ChainId,
742 recipient: ChainId,
743 bundles: Vec<(Epoch, MessageBundle)>,
744 ) -> Result<Option<BlockHeight>, WorkerError> {
745 self.query_chain_worker(recipient, move |callback| {
746 ChainWorkerRequest::ProcessCrossChainUpdate {
747 origin,
748 bundles,
749 callback,
750 }
751 })
752 .await
753 }
754
755 #[instrument(level = "trace", skip(self, chain_id, height), fields(
757 nickname = %self.nickname,
758 chain_id = %chain_id,
759 height = %height
760 ))]
761 #[cfg(with_testing)]
762 pub async fn read_certificate(
763 &self,
764 chain_id: ChainId,
765 height: BlockHeight,
766 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
767 self.query_chain_worker(chain_id, move |callback| {
768 ChainWorkerRequest::ReadCertificate { height, callback }
769 })
770 .await
771 }
772
773 #[instrument(level = "trace", skip(self), fields(
779 nickname = %self.nickname,
780 chain_id = %chain_id
781 ))]
782 pub async fn chain_state_view(
783 &self,
784 chain_id: ChainId,
785 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
786 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
787 callback,
788 })
789 .await
790 }
791
792 #[instrument(level = "trace", skip(self, request_builder), fields(
794 nickname = %self.nickname,
795 chain_id = %chain_id
796 ))]
797 async fn query_chain_worker<Response>(
798 &self,
799 chain_id: ChainId,
800 request_builder: impl FnOnce(
801 oneshot::Sender<Result<Response, WorkerError>>,
802 ) -> ChainWorkerRequest<StorageClient::Context>,
803 ) -> Result<Response, WorkerError> {
804 let (callback, response) = oneshot::channel();
806 let request = request_builder(callback);
807
808 let new_receiver = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
810
811 if let Some(receiver) = new_receiver {
813 let delivery_notifier = self
814 .delivery_notifiers
815 .lock()
816 .unwrap()
817 .entry(chain_id)
818 .or_default()
819 .clone();
820
821 let is_tracked = self
822 .tracked_chains
823 .as_ref()
824 .is_some_and(|tracked_chains| tracked_chains.read().unwrap().contains(&chain_id));
825
826 let actor_task = ChainWorkerActor::run(
827 self.chain_worker_config.clone(),
828 self.storage.clone(),
829 self.block_cache.clone(),
830 self.execution_state_cache.clone(),
831 self.tracked_chains.clone(),
832 delivery_notifier,
833 chain_id,
834 receiver,
835 is_tracked,
836 );
837
838 self.chain_worker_tasks
839 .lock()
840 .unwrap()
841 .spawn_task(actor_task);
842 }
843
844 match response.await {
846 Err(e) => {
847 Err(WorkerError::ChainActorRecvError {
849 chain_id,
850 error: Box::new(e),
851 })
852 }
853 Ok(response) => response,
854 }
855 }
856
857 #[instrument(level = "trace", skip(self), fields(
859 nickname = %self.nickname,
860 chain_id = %chain_id
861 ))]
862 #[expect(clippy::type_complexity)]
863 fn call_and_maybe_create_chain_worker_endpoint(
864 &self,
865 chain_id: ChainId,
866 request: ChainWorkerRequest<StorageClient::Context>,
867 ) -> Result<
868 Option<
869 mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
870 >,
871 WorkerError,
872 > {
873 let mut chain_workers = self.chain_workers.lock().unwrap();
874
875 let (sender, new_receiver) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
876 (endpoint, None)
877 } else {
878 let (sender, receiver) = mpsc::unbounded_channel();
879 (sender, Some(receiver))
880 };
881
882 if let Err(e) = sender.send((request, tracing::Span::current())) {
883 return Err(WorkerError::ChainActorSendError {
885 chain_id,
886 error: Box::new(e),
887 });
888 }
889
890 chain_workers.insert(chain_id, sender);
892
893 Ok(new_receiver)
894 }
895
896 #[instrument(skip_all, fields(
897 nick = self.nickname,
898 chain_id = format!("{:.8}", proposal.content.block.chain_id),
899 height = %proposal.content.block.height,
900 ))]
901 pub async fn handle_block_proposal(
902 &self,
903 proposal: BlockProposal,
904 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
905 trace!("{} <-- {:?}", self.nickname, proposal);
906 #[cfg(with_metrics)]
907 let round = proposal.content.round;
908 let response = self
909 .query_chain_worker(proposal.content.block.chain_id, move |callback| {
910 ChainWorkerRequest::HandleBlockProposal { proposal, callback }
911 })
912 .await?;
913 #[cfg(with_metrics)]
914 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
915 .with_label_values(&[round.type_name()])
916 .observe(round.number() as f64);
917 Ok(response)
918 }
919
920 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
923 pub async fn handle_lite_certificate(
924 &self,
925 certificate: LiteCertificate<'_>,
926 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
927 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
928 match self.full_certificate(certificate).await? {
929 Either::Left(confirmed) => {
930 self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
931 .await
932 }
933 Either::Right(validated) => {
934 if let Some(notifier) = notify_when_messages_are_delivered {
935 if let Err(()) = notifier.send(()) {
937 warn!("Failed to notify message delivery to caller");
938 }
939 }
940 self.handle_validated_certificate(validated).await
941 }
942 }
943 }
944
945 #[instrument(skip_all, fields(
947 nick = self.nickname,
948 chain_id = format!("{:.8}", certificate.block().header.chain_id),
949 height = %certificate.block().header.height,
950 ))]
951 pub async fn handle_confirmed_certificate(
952 &self,
953 certificate: ConfirmedBlockCertificate,
954 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
955 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
956 trace!("{} <-- {:?}", self.nickname, certificate);
957 #[cfg(with_metrics)]
958 let metrics_data = (
959 certificate.inner().to_log_str(),
960 certificate.round.type_name(),
961 certificate.round.number(),
962 certificate.block().body.transactions.len() as u64,
963 certificate
964 .signatures()
965 .iter()
966 .map(|(validator_name, _)| validator_name.to_string())
967 .collect::<Vec<_>>(),
968 );
969
970 let (info, actions, _outcome) = self
971 .process_confirmed_block(certificate, notify_when_messages_are_delivered)
972 .await?;
973
974 #[cfg(with_metrics)]
975 {
976 if matches!(_outcome, BlockOutcome::Processed) {
977 let (
978 certificate_log_str,
979 round_type,
980 round_number,
981 confirmed_transactions,
982 validators_with_signatures,
983 ) = metrics_data;
984 metrics::NUM_BLOCKS.with_label_values(&[]).inc();
985 metrics::NUM_ROUNDS_IN_CERTIFICATE
986 .with_label_values(&[certificate_log_str, round_type])
987 .observe(round_number as f64);
988 if confirmed_transactions > 0 {
989 metrics::TRANSACTION_COUNT
990 .with_label_values(&[])
991 .inc_by(confirmed_transactions);
992 }
993
994 for validator_name in validators_with_signatures {
995 metrics::CERTIFICATES_SIGNED
996 .with_label_values(&[&validator_name])
997 .inc();
998 }
999 }
1000 }
1001 Ok((info, actions))
1002 }
1003
1004 #[instrument(skip_all, fields(
1006 nick = self.nickname,
1007 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1008 height = %certificate.block().header.height,
1009 ))]
1010 pub async fn handle_validated_certificate(
1011 &self,
1012 certificate: ValidatedBlockCertificate,
1013 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1014 trace!("{} <-- {:?}", self.nickname, certificate);
1015
1016 #[cfg(with_metrics)]
1017 let round = certificate.round;
1018 #[cfg(with_metrics)]
1019 let cert_str = certificate.inner().to_log_str();
1020
1021 let (info, actions, _outcome) = self.process_validated_block(certificate).await?;
1022 #[cfg(with_metrics)]
1023 {
1024 if matches!(_outcome, BlockOutcome::Processed) {
1025 metrics::NUM_ROUNDS_IN_CERTIFICATE
1026 .with_label_values(&[cert_str, round.type_name()])
1027 .observe(round.number() as f64);
1028 }
1029 }
1030 Ok((info, actions))
1031 }
1032
1033 #[instrument(skip_all, fields(
1035 nick = self.nickname,
1036 chain_id = format!("{:.8}", certificate.inner().chain_id()),
1037 height = %certificate.inner().height(),
1038 ))]
1039 pub async fn handle_timeout_certificate(
1040 &self,
1041 certificate: TimeoutCertificate,
1042 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1043 trace!("{} <-- {:?}", self.nickname, certificate);
1044 self.process_timeout(certificate).await
1045 }
1046
1047 #[instrument(skip_all, fields(
1048 nick = self.nickname,
1049 chain_id = format!("{:.8}", query.chain_id)
1050 ))]
1051 pub async fn handle_chain_info_query(
1052 &self,
1053 query: ChainInfoQuery,
1054 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1055 trace!("{} <-- {:?}", self.nickname, query);
1056 #[cfg(with_metrics)]
1057 metrics::CHAIN_INFO_QUERIES.inc();
1058 let result = self
1059 .query_chain_worker(query.chain_id, move |callback| {
1060 ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1061 })
1062 .await;
1063 trace!("{} --> {:?}", self.nickname, result);
1064 result
1065 }
1066
1067 #[instrument(skip_all, fields(
1068 nick = self.nickname,
1069 chain_id = format!("{:.8}", chain_id)
1070 ))]
1071 pub async fn download_pending_blob(
1072 &self,
1073 chain_id: ChainId,
1074 blob_id: BlobId,
1075 ) -> Result<Blob, WorkerError> {
1076 trace!(
1077 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1078 self.nickname
1079 );
1080 let result = self
1081 .query_chain_worker(chain_id, move |callback| {
1082 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1083 })
1084 .await;
1085 trace!(
1086 "{} --> {:?}",
1087 self.nickname,
1088 result.as_ref().map(|_| blob_id)
1089 );
1090 result
1091 }
1092
1093 #[instrument(skip_all, fields(
1094 nick = self.nickname,
1095 chain_id = format!("{:.8}", chain_id)
1096 ))]
1097 pub async fn handle_pending_blob(
1098 &self,
1099 chain_id: ChainId,
1100 blob: Blob,
1101 ) -> Result<ChainInfoResponse, WorkerError> {
1102 let blob_id = blob.id();
1103 trace!(
1104 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1105 self.nickname
1106 );
1107 let result = self
1108 .query_chain_worker(chain_id, move |callback| {
1109 ChainWorkerRequest::HandlePendingBlob { blob, callback }
1110 })
1111 .await;
1112 trace!(
1113 "{} --> {:?}",
1114 self.nickname,
1115 result.as_ref().map(|_| blob_id)
1116 );
1117 result
1118 }
1119
1120 #[instrument(skip_all, fields(
1121 nick = self.nickname,
1122 chain_id = format!("{:.8}", request.target_chain_id())
1123 ))]
1124 pub async fn handle_cross_chain_request(
1125 &self,
1126 request: CrossChainRequest,
1127 ) -> Result<NetworkActions, WorkerError> {
1128 trace!("{} <-- {:?}", self.nickname, request);
1129 match request {
1130 CrossChainRequest::UpdateRecipient {
1131 sender,
1132 recipient,
1133 bundles,
1134 } => {
1135 let mut actions = NetworkActions::default();
1136 let origin = sender;
1137 let Some(height) = self
1138 .process_cross_chain_update(origin, recipient, bundles)
1139 .await?
1140 else {
1141 return Ok(actions);
1142 };
1143 actions.notifications.push(Notification {
1144 chain_id: recipient,
1145 reason: Reason::NewIncomingBundle { origin, height },
1146 });
1147 actions
1148 .cross_chain_requests
1149 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1150 sender,
1151 recipient,
1152 latest_height: height,
1153 });
1154 Ok(actions)
1155 }
1156 CrossChainRequest::ConfirmUpdatedRecipient {
1157 sender,
1158 recipient,
1159 latest_height,
1160 } => {
1161 self.query_chain_worker(sender, move |callback| {
1162 ChainWorkerRequest::ConfirmUpdatedRecipient {
1163 recipient,
1164 latest_height,
1165 callback,
1166 }
1167 })
1168 .await?;
1169 Ok(NetworkActions::default())
1170 }
1171 }
1172 }
1173
1174 #[instrument(skip_all, fields(
1176 nickname = %self.nickname,
1177 chain_id = %chain_id,
1178 num_trackers = %new_trackers.len()
1179 ))]
1180 pub async fn update_received_certificate_trackers(
1181 &self,
1182 chain_id: ChainId,
1183 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1184 ) -> Result<(), WorkerError> {
1185 self.query_chain_worker(chain_id, move |callback| {
1186 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1187 new_trackers,
1188 callback,
1189 }
1190 })
1191 .await
1192 }
1193
1194 #[instrument(skip_all, fields(
1196 nickname = %self.nickname,
1197 chain_id = %chain_id,
1198 start = %start,
1199 end = %end
1200 ))]
1201 pub async fn get_preprocessed_block_hashes(
1202 &self,
1203 chain_id: ChainId,
1204 start: BlockHeight,
1205 end: BlockHeight,
1206 ) -> Result<Vec<CryptoHash>, WorkerError> {
1207 self.query_chain_worker(chain_id, move |callback| {
1208 ChainWorkerRequest::GetPreprocessedBlockHashes {
1209 start,
1210 end,
1211 callback,
1212 }
1213 })
1214 .await
1215 }
1216
1217 #[instrument(skip_all, fields(
1219 nickname = %self.nickname,
1220 chain_id = %chain_id,
1221 origin = %origin
1222 ))]
1223 pub async fn get_inbox_next_height(
1224 &self,
1225 chain_id: ChainId,
1226 origin: ChainId,
1227 ) -> Result<BlockHeight, WorkerError> {
1228 self.query_chain_worker(chain_id, move |callback| {
1229 ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1230 })
1231 .await
1232 }
1233
1234 #[instrument(skip_all, fields(
1237 nickname = %self.nickname,
1238 chain_id = %chain_id,
1239 num_blob_ids = %blob_ids.len()
1240 ))]
1241 pub async fn get_locking_blobs(
1242 &self,
1243 chain_id: ChainId,
1244 blob_ids: Vec<BlobId>,
1245 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1246 self.query_chain_worker(chain_id, move |callback| {
1247 ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1248 })
1249 .await
1250 }
1251
1252 #[instrument(skip_all, fields(
1254 nickname = %self.nickname,
1255 chain_id = %chain_id,
1256 start = %start,
1257 end = %end
1258 ))]
1259 pub async fn read_confirmed_log(
1260 &self,
1261 chain_id: ChainId,
1262 start: BlockHeight,
1263 end: BlockHeight,
1264 ) -> Result<Vec<CryptoHash>, WorkerError> {
1265 self.query_chain_worker(chain_id, move |callback| {
1266 ChainWorkerRequest::ReadConfirmedLog {
1267 start,
1268 end,
1269 callback,
1270 }
1271 })
1272 .await
1273 }
1274}
1275
1276#[cfg(with_testing)]
1277impl<StorageClient> WorkerState<StorageClient>
1278where
1279 StorageClient: Storage,
1280{
1281 #[instrument(level = "trace", skip(self))]
1287 pub fn public_key(&self) -> ValidatorPublicKey {
1288 self.chain_worker_config
1289 .key_pair()
1290 .expect(
1291 "Test validator should have a key pair assigned to it \
1292 in order to obtain it's public key",
1293 )
1294 .public()
1295 }
1296}