1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14 data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round},
15 doc_scalar,
16 hashed::Hashed,
17 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
18 time::timer::{sleep, timeout},
19};
20#[cfg(with_testing)]
21use linera_chain::ChainExecutionContext;
22use linera_chain::{
23 data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24 types::{
25 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
26 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
27 },
28 ChainError, ChainStateView,
29};
30use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
31use linera_storage::Storage;
32use linera_views::{context::InactiveContext, ViewError};
33use serde::{Deserialize, Serialize};
34use thiserror::Error;
35use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
36use tracing::{error, instrument, trace, warn};
37
38use crate::{
39 chain_worker::{ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier},
40 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
41 join_set_ext::{JoinSet, JoinSetExt},
42 notifier::Notifier,
43 value_cache::ValueCache,
44};
45
46#[cfg(test)]
47#[path = "unit_tests/worker_tests.rs"]
48mod worker_tests;
49
50#[cfg(with_metrics)]
51mod metrics {
52 use std::sync::LazyLock;
53
54 use linera_base::prometheus_util::{
55 exponential_bucket_interval, register_histogram_vec, register_int_counter,
56 register_int_counter_vec,
57 };
58 use prometheus::{HistogramVec, IntCounter, IntCounterVec};
59
60 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
61 register_histogram_vec(
62 "num_rounds_in_certificate",
63 "Number of rounds in certificate",
64 &["certificate_value", "round_type"],
65 exponential_bucket_interval(0.1, 50.0),
66 )
67 });
68
69 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
70 register_histogram_vec(
71 "num_rounds_in_block_proposal",
72 "Number of rounds in block proposal",
73 &["round_type"],
74 exponential_bucket_interval(0.1, 50.0),
75 )
76 });
77
78 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
79 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
80
81 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
82 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
83 });
84
85 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
86 register_int_counter_vec(
87 "certificates_signed",
88 "Number of confirmed block certificates signed by each validator",
89 &["validator_name"],
90 )
91 });
92
93 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
94 register_int_counter(
95 "chain_info_queries",
96 "Number of chain info queries processed",
97 )
98 });
99}
100
101#[derive(Default, Debug)]
103pub struct NetworkActions {
104 pub cross_chain_requests: Vec<CrossChainRequest>,
106 pub notifications: Vec<Notification>,
108}
109
110impl NetworkActions {
111 pub fn extend(&mut self, other: NetworkActions) {
112 self.cross_chain_requests.extend(other.cross_chain_requests);
113 self.notifications.extend(other.notifications);
114 }
115}
116
117#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
118pub struct Notification {
120 pub chain_id: ChainId,
121 pub reason: Reason,
122}
123
124doc_scalar!(
125 Notification,
126 "Notify that a chain has a new certified block or a new message"
127);
128
129#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
130pub enum Reason {
132 NewBlock {
133 height: BlockHeight,
134 hash: CryptoHash,
135 },
136 NewEvents {
137 height: BlockHeight,
138 hash: CryptoHash,
139 event_streams: BTreeSet<StreamId>,
140 },
141 NewIncomingBundle {
142 origin: ChainId,
143 height: BlockHeight,
144 },
145 NewRound {
146 height: BlockHeight,
147 round: Round,
148 },
149}
150
151#[derive(Debug, Error)]
153pub enum WorkerError {
154 #[error(transparent)]
155 CryptoError(#[from] CryptoError),
156
157 #[error(transparent)]
158 ArithmeticError(#[from] ArithmeticError),
159
160 #[error(transparent)]
161 ViewError(#[from] ViewError),
162
163 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
164 ReadCertificatesError(Vec<CryptoHash>),
165
166 #[error(transparent)]
167 ChainError(#[from] Box<ChainError>),
168
169 #[error("Block was not signed by an authorized owner")]
171 InvalidOwner,
172
173 #[error("Operations in the block are not authenticated by the proper signer: {0}")]
174 InvalidSigner(AccountOwner),
175
176 #[error(
178 "Was expecting block height {expected_block_height} but found {found_block_height} instead"
179 )]
180 UnexpectedBlockHeight {
181 expected_block_height: BlockHeight,
182 found_block_height: BlockHeight,
183 },
184 #[error("Unexpected epoch {epoch:}: chain {chain_id:} is at {chain_epoch:}")]
185 InvalidEpoch {
186 chain_id: ChainId,
187 chain_epoch: Epoch,
188 epoch: Epoch,
189 },
190
191 #[error("Events not found: {0:?}")]
192 EventsNotFound(Vec<EventId>),
193
194 #[error("Invalid cross-chain request")]
196 InvalidCrossChainRequest,
197 #[error("The block does not contain the hash that we expected for the previous block")]
198 InvalidBlockChaining,
199 #[error(
200 "The given outcome is not what we computed after executing the block.\n\
201 Computed: {computed:#?}\n\
202 Submitted: {submitted:#?}"
203 )]
204 IncorrectOutcome {
205 computed: Box<BlockExecutionOutcome>,
206 submitted: Box<BlockExecutionOutcome>,
207 },
208 #[error("The block timestamp is in the future.")]
209 InvalidTimestamp,
210 #[error("We don't have the value for the certificate.")]
211 MissingCertificateValue,
212 #[error("The hash certificate doesn't match its value.")]
213 InvalidLiteCertificate,
214 #[error("Fast blocks cannot query oracles")]
215 FastBlockUsingOracles,
216 #[error("Blobs not found: {0:?}")]
217 BlobsNotFound(Vec<BlobId>),
218 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
219 ConfirmedLogEntryNotFound {
220 height: BlockHeight,
221 chain_id: ChainId,
222 },
223 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
224 PreprocessedBlocksEntryNotFound {
225 height: BlockHeight,
226 chain_id: ChainId,
227 },
228 #[error("The block proposal is invalid: {0}")]
229 InvalidBlockProposal(String),
230 #[error("The worker is too busy to handle new chains")]
231 FullChainWorkerCache,
232 #[error("Failed to join spawned worker task")]
233 JoinError,
234 #[error("Blob was not required by any pending block")]
235 UnexpectedBlob,
236 #[error("Number of published blobs per block must not exceed {0}")]
237 TooManyPublishedBlobs(u64),
238 #[error("Missing network description")]
239 MissingNetworkDescription,
240}
241
242impl From<ChainError> for WorkerError {
243 #[instrument(level = "trace", skip(chain_error))]
244 fn from(chain_error: ChainError) -> Self {
245 match chain_error {
246 ChainError::ExecutionError(execution_error, context) => {
247 if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
248 Self::BlobsNotFound(blob_ids)
249 } else {
250 Self::ChainError(Box::new(ChainError::ExecutionError(
251 execution_error,
252 context,
253 )))
254 }
255 }
256 error => Self::ChainError(Box::new(error)),
257 }
258 }
259}
260
261#[cfg(with_testing)]
262impl WorkerError {
263 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
269 let WorkerError::ChainError(chain_error) = self else {
270 panic!("Expected an `ExecutionError`. Got: {self:#?}");
271 };
272
273 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
274 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
275 };
276
277 assert_eq!(context, expected_context);
278
279 *execution_error
280 }
281}
282
283pub struct WorkerState<StorageClient>
285where
286 StorageClient: Storage,
287{
288 nickname: String,
290 storage: StorageClient,
292 chain_worker_config: ChainWorkerConfig,
294 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
295 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
296 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
298 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
301 chain_worker_tasks: Arc<Mutex<JoinSet>>,
303 chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
305}
306
307impl<StorageClient> Clone for WorkerState<StorageClient>
308where
309 StorageClient: Storage + Clone,
310{
311 fn clone(&self) -> Self {
312 WorkerState {
313 nickname: self.nickname.clone(),
314 storage: self.storage.clone(),
315 chain_worker_config: self.chain_worker_config.clone(),
316 block_cache: self.block_cache.clone(),
317 execution_state_cache: self.execution_state_cache.clone(),
318 tracked_chains: self.tracked_chains.clone(),
319 delivery_notifiers: self.delivery_notifiers.clone(),
320 chain_worker_tasks: self.chain_worker_tasks.clone(),
321 chain_workers: self.chain_workers.clone(),
322 }
323 }
324}
325
326type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
328 ChainWorkerRequest<<StorageClient as Storage>::Context>,
329 tracing::Span,
330)>;
331
332pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
333
334impl<StorageClient> WorkerState<StorageClient>
335where
336 StorageClient: Storage,
337{
338 #[instrument(level = "trace", skip(nickname, key_pair, storage))]
339 pub fn new(
340 nickname: String,
341 key_pair: Option<ValidatorSecretKey>,
342 storage: StorageClient,
343 ) -> Self {
344 WorkerState {
345 nickname,
346 storage,
347 chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
348 block_cache: Arc::new(ValueCache::default()),
349 execution_state_cache: Arc::new(ValueCache::default()),
350 tracked_chains: None,
351 delivery_notifiers: Arc::default(),
352 chain_worker_tasks: Arc::default(),
353 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
354 }
355 }
356
357 #[instrument(level = "trace", skip(nickname, storage))]
358 pub fn new_for_client(
359 nickname: String,
360 storage: StorageClient,
361 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
362 ) -> Self {
363 WorkerState {
364 nickname,
365 storage,
366 chain_worker_config: ChainWorkerConfig::default(),
367 block_cache: Arc::new(ValueCache::default()),
368 execution_state_cache: Arc::new(ValueCache::default()),
369 tracked_chains: Some(tracked_chains),
370 delivery_notifiers: Arc::default(),
371 chain_worker_tasks: Arc::default(),
372 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
373 }
374 }
375
376 #[instrument(level = "trace", skip(self, value))]
377 pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
378 self.chain_worker_config.allow_inactive_chains = value;
379 self
380 }
381
382 #[instrument(level = "trace", skip(self, value))]
383 pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
384 self.chain_worker_config
385 .allow_messages_from_deprecated_epochs = value;
386 self
387 }
388
389 #[instrument(level = "trace", skip(self, value))]
390 pub fn with_long_lived_services(mut self, value: bool) -> Self {
391 self.chain_worker_config.long_lived_services = value;
392 self
393 }
394
395 #[instrument(level = "trace", skip(self))]
400 pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
401 self.chain_worker_config.grace_period = grace_period;
402 self
403 }
404
405 #[instrument(level = "trace", skip(self))]
409 pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
410 self.chain_worker_config.ttl = chain_worker_ttl;
411 self
412 }
413
414 #[instrument(level = "trace", skip(self))]
415 pub fn nickname(&self) -> &str {
416 &self.nickname
417 }
418
419 #[instrument(level = "trace", skip(self))]
421 #[cfg(not(feature = "test"))]
422 pub(crate) fn storage_client(&self) -> &StorageClient {
423 &self.storage
424 }
425
426 #[instrument(level = "trace", skip(self))]
429 #[cfg(feature = "test")]
430 pub fn storage_client(&self) -> &StorageClient {
431 &self.storage
432 }
433
434 #[instrument(level = "trace", skip(self, certificate))]
435 pub(crate) async fn full_certificate(
436 &self,
437 certificate: LiteCertificate<'_>,
438 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
439 let block = self
440 .block_cache
441 .get(&certificate.value.value_hash)
442 .ok_or(WorkerError::MissingCertificateValue)?;
443
444 match certificate.value.kind {
445 linera_chain::types::CertificateKind::Confirmed => {
446 let value = ConfirmedBlock::from_hashed(block);
447 Ok(Either::Left(
448 certificate
449 .with_value(value)
450 .ok_or(WorkerError::InvalidLiteCertificate)?,
451 ))
452 }
453 linera_chain::types::CertificateKind::Validated => {
454 let value = ValidatedBlock::from_hashed(block);
455 Ok(Either::Right(
456 certificate
457 .with_value(value)
458 .ok_or(WorkerError::InvalidLiteCertificate)?,
459 ))
460 }
461 _ => return Err(WorkerError::InvalidLiteCertificate),
462 }
463 }
464}
465
466#[allow(async_fn_in_trait)]
467#[cfg_attr(not(web), trait_variant::make(Send))]
468pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
469 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
470 worker: &WorkerState<S>,
471 certificate: GenericCertificate<Self>,
472 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
473}
474
475impl ProcessableCertificate for ConfirmedBlock {
476 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
477 worker: &WorkerState<S>,
478 certificate: ConfirmedBlockCertificate,
479 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
480 worker.handle_confirmed_certificate(certificate, None).await
481 }
482}
483
484impl ProcessableCertificate for ValidatedBlock {
485 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
486 worker: &WorkerState<S>,
487 certificate: ValidatedBlockCertificate,
488 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
489 worker.handle_validated_certificate(certificate).await
490 }
491}
492
493impl ProcessableCertificate for Timeout {
494 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
495 worker: &WorkerState<S>,
496 certificate: TimeoutCertificate,
497 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
498 worker.handle_timeout_certificate(certificate).await
499 }
500}
501
502impl<StorageClient> WorkerState<StorageClient>
503where
504 StorageClient: Storage + Clone + Send + Sync + 'static,
505{
506 #[instrument(level = "trace", skip(self, certificate, notifier))]
507 #[inline]
508 pub async fn fully_handle_certificate_with_notifications<T>(
509 &self,
510 certificate: GenericCertificate<T>,
511 notifier: &impl Notifier,
512 ) -> Result<ChainInfoResponse, WorkerError>
513 where
514 T: ProcessableCertificate,
515 {
516 let notifications = (*notifier).clone();
517 let this = self.clone();
518 linera_base::task::spawn(async move {
519 let (response, actions) =
520 ProcessableCertificate::process_certificate(&this, certificate).await?;
521 notifications.notify(&actions.notifications);
522 let mut requests = VecDeque::from(actions.cross_chain_requests);
523 while let Some(request) = requests.pop_front() {
524 let actions = this.handle_cross_chain_request(request).await?;
525 requests.extend(actions.cross_chain_requests);
526 notifications.notify(&actions.notifications);
527 }
528 Ok(response)
529 })
530 .await
531 .unwrap_or_else(|_| Err(WorkerError::JoinError))
532 }
533
534 #[instrument(level = "trace", skip(self, block))]
536 pub async fn stage_block_execution(
537 &self,
538 block: ProposedBlock,
539 round: Option<u32>,
540 published_blobs: Vec<Blob>,
541 ) -> Result<(Block, ChainInfoResponse), WorkerError> {
542 self.query_chain_worker(block.chain_id, move |callback| {
543 ChainWorkerRequest::StageBlockExecution {
544 block,
545 round,
546 published_blobs,
547 callback,
548 }
549 })
550 .await
551 }
552
553 #[instrument(level = "trace", skip(self, chain_id, query))]
555 pub async fn query_application(
556 &self,
557 chain_id: ChainId,
558 query: Query,
559 ) -> Result<QueryOutcome, WorkerError> {
560 self.query_chain_worker(chain_id, move |callback| {
561 ChainWorkerRequest::QueryApplication { query, callback }
562 })
563 .await
564 }
565
566 #[instrument(level = "trace", skip(self, chain_id, application_id))]
567 pub async fn describe_application(
568 &self,
569 chain_id: ChainId,
570 application_id: ApplicationId,
571 ) -> Result<ApplicationDescription, WorkerError> {
572 self.query_chain_worker(chain_id, move |callback| {
573 ChainWorkerRequest::DescribeApplication {
574 application_id,
575 callback,
576 }
577 })
578 .await
579 }
580
581 #[instrument(
583 level = "trace",
584 skip(self, certificate, notify_when_messages_are_delivered)
585 )]
586 async fn process_confirmed_block(
587 &self,
588 certificate: ConfirmedBlockCertificate,
589 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
590 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
591 let chain_id = certificate.block().header.chain_id;
592 self.query_chain_worker(chain_id, move |callback| {
593 ChainWorkerRequest::ProcessConfirmedBlock {
594 certificate,
595 notify_when_messages_are_delivered,
596 callback,
597 }
598 })
599 .await
600 }
601
602 #[instrument(level = "trace", skip(self, certificate))]
604 async fn process_validated_block(
605 &self,
606 certificate: ValidatedBlockCertificate,
607 ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
608 let chain_id = certificate.block().header.chain_id;
609 self.query_chain_worker(chain_id, move |callback| {
610 ChainWorkerRequest::ProcessValidatedBlock {
611 certificate,
612 callback,
613 }
614 })
615 .await
616 }
617
618 #[instrument(level = "trace", skip(self, certificate))]
620 async fn process_timeout(
621 &self,
622 certificate: TimeoutCertificate,
623 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
624 let chain_id = certificate.value().chain_id();
625 self.query_chain_worker(chain_id, move |callback| {
626 ChainWorkerRequest::ProcessTimeout {
627 certificate,
628 callback,
629 }
630 })
631 .await
632 }
633
634 #[instrument(level = "trace", skip(self, origin, recipient, bundles))]
635 async fn process_cross_chain_update(
636 &self,
637 origin: ChainId,
638 recipient: ChainId,
639 bundles: Vec<(Epoch, MessageBundle)>,
640 ) -> Result<Option<BlockHeight>, WorkerError> {
641 self.query_chain_worker(recipient, move |callback| {
642 ChainWorkerRequest::ProcessCrossChainUpdate {
643 origin,
644 bundles,
645 callback,
646 }
647 })
648 .await
649 }
650
651 #[instrument(level = "trace", skip(self, chain_id, height))]
653 #[cfg(with_testing)]
654 pub async fn read_certificate(
655 &self,
656 chain_id: ChainId,
657 height: BlockHeight,
658 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
659 self.query_chain_worker(chain_id, move |callback| {
660 ChainWorkerRequest::ReadCertificate { height, callback }
661 })
662 .await
663 }
664
665 #[instrument(level = "trace", skip(self))]
671 pub async fn chain_state_view(
672 &self,
673 chain_id: ChainId,
674 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
675 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
676 callback,
677 })
678 .await
679 }
680
681 #[instrument(level = "trace", skip(self, request_builder))]
682 async fn query_chain_worker<Response>(
684 &self,
685 chain_id: ChainId,
686 request_builder: impl FnOnce(
687 oneshot::Sender<Result<Response, WorkerError>>,
688 ) -> ChainWorkerRequest<StorageClient::Context>,
689 ) -> Result<Response, WorkerError> {
690 let chain_actor = self.get_chain_worker_endpoint(chain_id).await?;
691 let (callback, response) = oneshot::channel();
692
693 chain_actor
694 .send((request_builder(callback), tracing::Span::current()))
695 .expect("`ChainWorkerActor` stopped executing unexpectedly");
696
697 response
698 .await
699 .expect("`ChainWorkerActor` stopped executing without responding")
700 }
701
702 #[instrument(level = "trace", skip(self))]
705 async fn get_chain_worker_endpoint(
706 &self,
707 chain_id: ChainId,
708 ) -> Result<ChainActorEndpoint<StorageClient>, WorkerError> {
709 let (sender, new_receiver) = timeout(Duration::from_secs(3), async move {
710 loop {
711 match self.try_get_chain_worker_endpoint(chain_id) {
712 Some(endpoint) => break endpoint,
713 None => sleep(Duration::from_millis(250)).await,
714 }
715 }
716 })
717 .await
718 .map_err(|_| WorkerError::FullChainWorkerCache)?;
719
720 if let Some(receiver) = new_receiver {
721 let delivery_notifier = self
722 .delivery_notifiers
723 .lock()
724 .unwrap()
725 .entry(chain_id)
726 .or_default()
727 .clone();
728
729 let actor_task = ChainWorkerActor::run(
730 self.chain_worker_config.clone(),
731 self.storage.clone(),
732 self.block_cache.clone(),
733 self.execution_state_cache.clone(),
734 self.tracked_chains.clone(),
735 delivery_notifier,
736 chain_id,
737 receiver,
738 );
739
740 self.chain_worker_tasks
741 .lock()
742 .unwrap()
743 .spawn_task(actor_task);
744 }
745
746 Ok(sender)
747 }
748
749 #[instrument(level = "trace", skip(self))]
754 #[expect(clippy::type_complexity)]
755 fn try_get_chain_worker_endpoint(
756 &self,
757 chain_id: ChainId,
758 ) -> Option<(
759 ChainActorEndpoint<StorageClient>,
760 Option<
761 mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
762 >,
763 )> {
764 let mut chain_workers = self.chain_workers.lock().unwrap();
765
766 if let Some(endpoint) = chain_workers.get(&chain_id) {
767 Some((endpoint.clone(), None))
768 } else {
769 let (sender, receiver) = mpsc::unbounded_channel();
770 chain_workers.insert(chain_id, sender.clone());
771 Some((sender, Some(receiver)))
772 }
773 }
774
775 #[instrument(skip_all, fields(
776 nick = self.nickname,
777 chain_id = format!("{:.8}", proposal.content.block.chain_id),
778 height = %proposal.content.block.height,
779 ))]
780 pub async fn handle_block_proposal(
781 &self,
782 proposal: BlockProposal,
783 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
784 trace!("{} <-- {:?}", self.nickname, proposal);
785 #[cfg(with_metrics)]
786 let round = proposal.content.round;
787 let response = self
788 .query_chain_worker(proposal.content.block.chain_id, move |callback| {
789 ChainWorkerRequest::HandleBlockProposal { proposal, callback }
790 })
791 .await?;
792 #[cfg(with_metrics)]
793 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
794 .with_label_values(&[round.type_name()])
795 .observe(round.number() as f64);
796 Ok(response)
797 }
798
799 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
802 pub async fn handle_lite_certificate(
803 &self,
804 certificate: LiteCertificate<'_>,
805 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
806 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
807 match self.full_certificate(certificate).await? {
808 Either::Left(confirmed) => {
809 self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
810 .await
811 }
812 Either::Right(validated) => {
813 if let Some(notifier) = notify_when_messages_are_delivered {
814 if let Err(()) = notifier.send(()) {
816 warn!("Failed to notify message delivery to caller");
817 }
818 }
819 self.handle_validated_certificate(validated).await
820 }
821 }
822 }
823
824 #[instrument(skip_all, fields(
826 nick = self.nickname,
827 chain_id = format!("{:.8}", certificate.block().header.chain_id),
828 height = %certificate.block().header.height,
829 ))]
830 pub async fn handle_confirmed_certificate(
831 &self,
832 certificate: ConfirmedBlockCertificate,
833 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
834 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
835 trace!("{} <-- {:?}", self.nickname, certificate);
836 #[cfg(with_metrics)]
837 let metrics_data = if self
838 .chain_state_view(certificate.block().header.chain_id)
839 .await?
840 .tip_state
841 .get()
842 .next_block_height
843 == certificate.block().header.height
844 {
845 Some((
846 certificate.inner().to_log_str(),
847 certificate.round.type_name(),
848 certificate.round.number(),
849 certificate.block().body.transactions.len() as u64,
850 certificate
851 .signatures()
852 .iter()
853 .map(|(validator_name, _)| validator_name.to_string())
854 .collect::<Vec<_>>(),
855 ))
856 } else {
857 None
859 };
860
861 let result = self
862 .process_confirmed_block(certificate, notify_when_messages_are_delivered)
863 .await?;
864
865 #[cfg(with_metrics)]
866 {
867 if let Some(metrics_data) = metrics_data {
868 let (
869 certificate_log_str,
870 round_type,
871 round_number,
872 confirmed_transactions,
873 validators_with_signatures,
874 ) = metrics_data;
875 metrics::NUM_BLOCKS.with_label_values(&[]).inc();
876 metrics::NUM_ROUNDS_IN_CERTIFICATE
877 .with_label_values(&[certificate_log_str, round_type])
878 .observe(round_number as f64);
879 if confirmed_transactions > 0 {
880 metrics::TRANSACTION_COUNT
881 .with_label_values(&[])
882 .inc_by(confirmed_transactions);
883 }
884
885 for validator_name in validators_with_signatures {
886 metrics::CERTIFICATES_SIGNED
887 .with_label_values(&[&validator_name])
888 .inc();
889 }
890 }
891 }
892 Ok(result)
893 }
894
895 #[instrument(skip_all, fields(
897 nick = self.nickname,
898 chain_id = format!("{:.8}", certificate.block().header.chain_id),
899 height = %certificate.block().header.height,
900 ))]
901 pub async fn handle_validated_certificate(
902 &self,
903 certificate: ValidatedBlockCertificate,
904 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
905 trace!("{} <-- {:?}", self.nickname, certificate);
906
907 #[cfg(with_metrics)]
908 let round = certificate.round;
909 #[cfg(with_metrics)]
910 let cert_str = certificate.inner().to_log_str();
911
912 let (info, actions, _duplicated) = self.process_validated_block(certificate).await?;
913 #[cfg(with_metrics)]
914 {
915 if !_duplicated {
916 metrics::NUM_ROUNDS_IN_CERTIFICATE
917 .with_label_values(&[cert_str, round.type_name()])
918 .observe(round.number() as f64);
919 }
920 }
921 Ok((info, actions))
922 }
923
924 #[instrument(skip_all, fields(
926 nick = self.nickname,
927 chain_id = format!("{:.8}", certificate.inner().chain_id()),
928 height = %certificate.inner().height(),
929 ))]
930 pub async fn handle_timeout_certificate(
931 &self,
932 certificate: TimeoutCertificate,
933 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
934 trace!("{} <-- {:?}", self.nickname, certificate);
935 self.process_timeout(certificate).await
936 }
937
938 #[instrument(skip_all, fields(
939 nick = self.nickname,
940 chain_id = format!("{:.8}", query.chain_id)
941 ))]
942 pub async fn handle_chain_info_query(
943 &self,
944 query: ChainInfoQuery,
945 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
946 trace!("{} <-- {:?}", self.nickname, query);
947 #[cfg(with_metrics)]
948 metrics::CHAIN_INFO_QUERIES.inc();
949 let result = self
950 .query_chain_worker(query.chain_id, move |callback| {
951 ChainWorkerRequest::HandleChainInfoQuery { query, callback }
952 })
953 .await;
954 trace!("{} --> {:?}", self.nickname, result);
955 result
956 }
957
958 #[instrument(skip_all, fields(
959 nick = self.nickname,
960 chain_id = format!("{:.8}", chain_id)
961 ))]
962 pub async fn download_pending_blob(
963 &self,
964 chain_id: ChainId,
965 blob_id: BlobId,
966 ) -> Result<Blob, WorkerError> {
967 trace!(
968 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
969 self.nickname
970 );
971 let result = self
972 .query_chain_worker(chain_id, move |callback| {
973 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
974 })
975 .await;
976 trace!(
977 "{} --> {:?}",
978 self.nickname,
979 result.as_ref().map(|_| blob_id)
980 );
981 result
982 }
983
984 #[instrument(skip_all, fields(
985 nick = self.nickname,
986 chain_id = format!("{:.8}", chain_id)
987 ))]
988 pub async fn handle_pending_blob(
989 &self,
990 chain_id: ChainId,
991 blob: Blob,
992 ) -> Result<ChainInfoResponse, WorkerError> {
993 let blob_id = blob.id();
994 trace!(
995 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
996 self.nickname
997 );
998 let result = self
999 .query_chain_worker(chain_id, move |callback| {
1000 ChainWorkerRequest::HandlePendingBlob { blob, callback }
1001 })
1002 .await;
1003 trace!(
1004 "{} --> {:?}",
1005 self.nickname,
1006 result.as_ref().map(|_| blob_id)
1007 );
1008 result
1009 }
1010
1011 #[instrument(skip_all, fields(
1012 nick = self.nickname,
1013 chain_id = format!("{:.8}", request.target_chain_id())
1014 ))]
1015 pub async fn handle_cross_chain_request(
1016 &self,
1017 request: CrossChainRequest,
1018 ) -> Result<NetworkActions, WorkerError> {
1019 trace!("{} <-- {:?}", self.nickname, request);
1020 match request {
1021 CrossChainRequest::UpdateRecipient {
1022 sender,
1023 recipient,
1024 bundles,
1025 } => {
1026 let mut actions = NetworkActions::default();
1027 let origin = sender;
1028 let Some(height) = self
1029 .process_cross_chain_update(origin, recipient, bundles)
1030 .await?
1031 else {
1032 return Ok(actions);
1033 };
1034 actions.notifications.push(Notification {
1035 chain_id: recipient,
1036 reason: Reason::NewIncomingBundle { origin, height },
1037 });
1038 actions
1039 .cross_chain_requests
1040 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1041 sender,
1042 recipient,
1043 latest_height: height,
1044 });
1045 Ok(actions)
1046 }
1047 CrossChainRequest::ConfirmUpdatedRecipient {
1048 sender,
1049 recipient,
1050 latest_height,
1051 } => {
1052 self.query_chain_worker(sender, move |callback| {
1053 ChainWorkerRequest::ConfirmUpdatedRecipient {
1054 recipient,
1055 latest_height,
1056 callback,
1057 }
1058 })
1059 .await?;
1060 Ok(NetworkActions::default())
1061 }
1062 }
1063 }
1064
1065 pub async fn update_received_certificate_trackers(
1067 &self,
1068 chain_id: ChainId,
1069 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1070 ) -> Result<(), WorkerError> {
1071 self.query_chain_worker(chain_id, move |callback| {
1072 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1073 new_trackers,
1074 callback,
1075 }
1076 })
1077 .await
1078 }
1079}
1080
1081#[cfg(with_testing)]
1082impl<StorageClient> WorkerState<StorageClient>
1083where
1084 StorageClient: Storage,
1085{
1086 #[instrument(level = "trace", skip(self))]
1092 pub fn public_key(&self) -> ValidatorPublicKey {
1093 self.chain_worker_config
1094 .key_pair()
1095 .expect(
1096 "Test validator should have a key pair assigned to it \
1097 in order to obtain it's public key",
1098 )
1099 .public()
1100 }
1101}