1use std::{
6 collections::{BTreeMap, HashMap, HashSet, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14 data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round},
15 doc_scalar,
16 hashed::Hashed,
17 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId},
18 time::timer::{sleep, timeout},
19};
20#[cfg(with_testing)]
21use linera_chain::ChainExecutionContext;
22use linera_chain::{
23 data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24 types::{
25 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
26 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
27 },
28 ChainError, ChainStateView,
29};
30use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
31use linera_storage::Storage;
32use linera_views::ViewError;
33use serde::{Deserialize, Serialize};
34use thiserror::Error;
35use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
36use tracing::{error, instrument, trace, warn};
37
38use crate::{
39 chain_worker::{ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier},
40 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
41 join_set_ext::{JoinSet, JoinSetExt},
42 notifier::Notifier,
43 value_cache::ValueCache,
44};
45
46#[cfg(test)]
47#[path = "unit_tests/worker_tests.rs"]
48mod worker_tests;
49
50#[cfg(with_metrics)]
51mod metrics {
52 use std::sync::LazyLock;
53
54 use linera_base::prometheus_util::{
55 exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
56 };
57 use prometheus::{HistogramVec, IntCounterVec};
58
59 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
60 register_histogram_vec(
61 "num_rounds_in_certificate",
62 "Number of rounds in certificate",
63 &["certificate_value", "round_type"],
64 exponential_bucket_interval(0.1, 50.0),
65 )
66 });
67
68 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
69 register_histogram_vec(
70 "num_rounds_in_block_proposal",
71 "Number of rounds in block proposal",
72 &["round_type"],
73 exponential_bucket_interval(0.1, 50.0),
74 )
75 });
76
77 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
78 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
79
80 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
81 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
82 });
83
84 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
85 register_int_counter_vec(
86 "certificates_signed",
87 "Number of confirmed block certificates signed by each validator",
88 &["validator_name"],
89 )
90 });
91}
92
93#[derive(Default, Debug)]
95pub struct NetworkActions {
96 pub cross_chain_requests: Vec<CrossChainRequest>,
98 pub notifications: Vec<Notification>,
100}
101
102impl NetworkActions {
103 pub fn extend(&mut self, other: NetworkActions) {
104 self.cross_chain_requests.extend(other.cross_chain_requests);
105 self.notifications.extend(other.notifications);
106 }
107}
108
109#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
110pub struct Notification {
112 pub chain_id: ChainId,
113 pub reason: Reason,
114}
115
116doc_scalar!(
117 Notification,
118 "Notify that a chain has a new certified block or a new message"
119);
120
121#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
122pub enum Reason {
124 NewBlock {
125 height: BlockHeight,
126 hash: CryptoHash,
127 },
128 NewIncomingBundle {
129 origin: ChainId,
130 height: BlockHeight,
131 },
132 NewRound {
133 height: BlockHeight,
134 round: Round,
135 },
136}
137
138#[derive(Debug, Error)]
140pub enum WorkerError {
141 #[error(transparent)]
142 CryptoError(#[from] CryptoError),
143
144 #[error(transparent)]
145 ArithmeticError(#[from] ArithmeticError),
146
147 #[error(transparent)]
148 ViewError(#[from] ViewError),
149
150 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
151 ReadCertificatesError(Vec<CryptoHash>),
152
153 #[error(transparent)]
154 ChainError(#[from] Box<ChainError>),
155
156 #[error("Block was not signed by an authorized owner")]
158 InvalidOwner,
159
160 #[error("Operations in the block are not authenticated by the proper signer: {0}")]
161 InvalidSigner(AccountOwner),
162
163 #[error(
165 "Was expecting block height {expected_block_height} but found {found_block_height} instead"
166 )]
167 UnexpectedBlockHeight {
168 expected_block_height: BlockHeight,
169 found_block_height: BlockHeight,
170 },
171 #[error("Unexpected epoch {epoch:}: chain {chain_id:} is at {chain_epoch:}")]
172 InvalidEpoch {
173 chain_id: ChainId,
174 chain_epoch: Epoch,
175 epoch: Epoch,
176 },
177
178 #[error("Events not found: {0:?}")]
179 EventsNotFound(Vec<EventId>),
180
181 #[error("Invalid cross-chain request")]
183 InvalidCrossChainRequest,
184 #[error("The block does not contain the hash that we expected for the previous block")]
185 InvalidBlockChaining,
186 #[error(
187 "The given outcome is not what we computed after executing the block.\n\
188 Computed: {computed:#?}\n\
189 Submitted: {submitted:#?}"
190 )]
191 IncorrectOutcome {
192 computed: Box<BlockExecutionOutcome>,
193 submitted: Box<BlockExecutionOutcome>,
194 },
195 #[error("The block timestamp is in the future.")]
196 InvalidTimestamp,
197 #[error("We don't have the value for the certificate.")]
198 MissingCertificateValue,
199 #[error("The hash certificate doesn't match its value.")]
200 InvalidLiteCertificate,
201 #[error("Fast blocks cannot query oracles")]
202 FastBlockUsingOracles,
203 #[error("Blobs not found: {0:?}")]
204 BlobsNotFound(Vec<BlobId>),
205 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
206 ConfirmedLogEntryNotFound {
207 height: BlockHeight,
208 chain_id: ChainId,
209 },
210 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
211 PreprocessedBlocksEntryNotFound {
212 height: BlockHeight,
213 chain_id: ChainId,
214 },
215 #[error("The block proposal is invalid: {0}")]
216 InvalidBlockProposal(String),
217 #[error("The worker is too busy to handle new chains")]
218 FullChainWorkerCache,
219 #[error("Failed to join spawned worker task")]
220 JoinError,
221 #[error("Blob was not required by any pending block")]
222 UnexpectedBlob,
223 #[error("Number of published blobs per block must not exceed {0}")]
224 TooManyPublishedBlobs(u64),
225 #[error("Missing network description")]
226 MissingNetworkDescription,
227}
228
229impl From<ChainError> for WorkerError {
230 #[instrument(level = "trace", skip(chain_error))]
231 fn from(chain_error: ChainError) -> Self {
232 match chain_error {
233 ChainError::ExecutionError(execution_error, context) => {
234 if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
235 Self::BlobsNotFound(blob_ids)
236 } else {
237 Self::ChainError(Box::new(ChainError::ExecutionError(
238 execution_error,
239 context,
240 )))
241 }
242 }
243 error => Self::ChainError(Box::new(error)),
244 }
245 }
246}
247
248#[cfg(with_testing)]
249impl WorkerError {
250 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
256 let WorkerError::ChainError(chain_error) = self else {
257 panic!("Expected an `ExecutionError`. Got: {self:#?}");
258 };
259
260 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
261 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
262 };
263
264 assert_eq!(context, expected_context);
265
266 *execution_error
267 }
268}
269
270pub struct WorkerState<StorageClient>
272where
273 StorageClient: Storage,
274{
275 nickname: String,
277 storage: StorageClient,
279 chain_worker_config: ChainWorkerConfig,
281 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
282 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
283 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
285 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
288 chain_worker_tasks: Arc<Mutex<JoinSet>>,
290 chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
292}
293
294impl<StorageClient> Clone for WorkerState<StorageClient>
295where
296 StorageClient: Storage + Clone,
297{
298 fn clone(&self) -> Self {
299 WorkerState {
300 nickname: self.nickname.clone(),
301 storage: self.storage.clone(),
302 chain_worker_config: self.chain_worker_config.clone(),
303 block_cache: self.block_cache.clone(),
304 execution_state_cache: self.execution_state_cache.clone(),
305 tracked_chains: self.tracked_chains.clone(),
306 delivery_notifiers: self.delivery_notifiers.clone(),
307 chain_worker_tasks: self.chain_worker_tasks.clone(),
308 chain_workers: self.chain_workers.clone(),
309 }
310 }
311}
312
313type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
315 ChainWorkerRequest<<StorageClient as Storage>::Context>,
316 tracing::Span,
317)>;
318
319pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
320
321impl<StorageClient> WorkerState<StorageClient>
322where
323 StorageClient: Storage,
324{
325 #[instrument(level = "trace", skip(nickname, key_pair, storage))]
326 pub fn new(
327 nickname: String,
328 key_pair: Option<ValidatorSecretKey>,
329 storage: StorageClient,
330 ) -> Self {
331 WorkerState {
332 nickname,
333 storage,
334 chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
335 block_cache: Arc::new(ValueCache::default()),
336 execution_state_cache: Arc::new(ValueCache::default()),
337 tracked_chains: None,
338 delivery_notifiers: Arc::default(),
339 chain_worker_tasks: Arc::default(),
340 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
341 }
342 }
343
344 #[instrument(level = "trace", skip(nickname, storage))]
345 pub fn new_for_client(
346 nickname: String,
347 storage: StorageClient,
348 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
349 ) -> Self {
350 WorkerState {
351 nickname,
352 storage,
353 chain_worker_config: ChainWorkerConfig::default(),
354 block_cache: Arc::new(ValueCache::default()),
355 execution_state_cache: Arc::new(ValueCache::default()),
356 tracked_chains: Some(tracked_chains),
357 delivery_notifiers: Arc::default(),
358 chain_worker_tasks: Arc::default(),
359 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
360 }
361 }
362
363 #[instrument(level = "trace", skip(self, value))]
364 pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
365 self.chain_worker_config.allow_inactive_chains = value;
366 self
367 }
368
369 #[instrument(level = "trace", skip(self, value))]
370 pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
371 self.chain_worker_config
372 .allow_messages_from_deprecated_epochs = value;
373 self
374 }
375
376 #[instrument(level = "trace", skip(self, value))]
377 pub fn with_long_lived_services(mut self, value: bool) -> Self {
378 self.chain_worker_config.long_lived_services = value;
379 self
380 }
381
382 #[instrument(level = "trace", skip(self))]
387 pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
388 self.chain_worker_config.grace_period = grace_period;
389 self
390 }
391
392 #[instrument(level = "trace", skip(self))]
396 pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
397 self.chain_worker_config.ttl = chain_worker_ttl;
398 self
399 }
400
401 #[instrument(level = "trace", skip(self))]
402 pub fn nickname(&self) -> &str {
403 &self.nickname
404 }
405
406 #[instrument(level = "trace", skip(self))]
408 #[cfg(not(feature = "test"))]
409 pub(crate) fn storage_client(&self) -> &StorageClient {
410 &self.storage
411 }
412
413 #[instrument(level = "trace", skip(self))]
416 #[cfg(feature = "test")]
417 pub fn storage_client(&self) -> &StorageClient {
418 &self.storage
419 }
420
421 #[instrument(level = "trace", skip(self, key_pair))]
422 #[cfg(test)]
423 pub(crate) async fn with_key_pair(mut self, key_pair: Option<Arc<ValidatorSecretKey>>) -> Self {
424 self.chain_worker_config.key_pair = key_pair;
425 self.chain_workers.lock().unwrap().clear();
426 self
427 }
428
429 #[instrument(level = "trace", skip(self, certificate))]
430 pub(crate) async fn full_certificate(
431 &self,
432 certificate: LiteCertificate<'_>,
433 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
434 let block = self
435 .block_cache
436 .get(&certificate.value.value_hash)
437 .ok_or(WorkerError::MissingCertificateValue)?;
438
439 match certificate.value.kind {
440 linera_chain::types::CertificateKind::Confirmed => {
441 let value = ConfirmedBlock::from_hashed(block);
442 Ok(Either::Left(
443 certificate
444 .with_value(value)
445 .ok_or(WorkerError::InvalidLiteCertificate)?,
446 ))
447 }
448 linera_chain::types::CertificateKind::Validated => {
449 let value = ValidatedBlock::from_hashed(block);
450 Ok(Either::Right(
451 certificate
452 .with_value(value)
453 .ok_or(WorkerError::InvalidLiteCertificate)?,
454 ))
455 }
456 _ => return Err(WorkerError::InvalidLiteCertificate),
457 }
458 }
459}
460
461#[allow(async_fn_in_trait)]
462#[cfg_attr(not(web), trait_variant::make(Send))]
463pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
464 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
465 worker: &WorkerState<S>,
466 certificate: GenericCertificate<Self>,
467 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
468}
469
470impl ProcessableCertificate for ConfirmedBlock {
471 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
472 worker: &WorkerState<S>,
473 certificate: ConfirmedBlockCertificate,
474 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
475 worker.handle_confirmed_certificate(certificate, None).await
476 }
477}
478
479impl ProcessableCertificate for ValidatedBlock {
480 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
481 worker: &WorkerState<S>,
482 certificate: ValidatedBlockCertificate,
483 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
484 worker.handle_validated_certificate(certificate).await
485 }
486}
487
488impl ProcessableCertificate for Timeout {
489 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
490 worker: &WorkerState<S>,
491 certificate: TimeoutCertificate,
492 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
493 worker.handle_timeout_certificate(certificate).await
494 }
495}
496
497impl<StorageClient> WorkerState<StorageClient>
498where
499 StorageClient: Storage + Clone + Send + Sync + 'static,
500{
501 #[instrument(level = "trace", skip(self, certificate, notifier))]
502 #[inline]
503 pub async fn fully_handle_certificate_with_notifications<T>(
504 &self,
505 certificate: GenericCertificate<T>,
506 notifier: &impl Notifier,
507 ) -> Result<ChainInfoResponse, WorkerError>
508 where
509 T: ProcessableCertificate,
510 {
511 let notifications = (*notifier).clone();
512 let this = self.clone();
513 linera_base::task::spawn(async move {
514 let (response, actions) =
515 ProcessableCertificate::process_certificate(&this, certificate).await?;
516 notifications.notify(&actions.notifications);
517 let mut requests = VecDeque::from(actions.cross_chain_requests);
518 while let Some(request) = requests.pop_front() {
519 let actions = this.handle_cross_chain_request(request).await?;
520 requests.extend(actions.cross_chain_requests);
521 notifications.notify(&actions.notifications);
522 }
523 Ok(response)
524 })
525 .await
526 .unwrap_or_else(|_| Err(WorkerError::JoinError))
527 }
528
529 #[instrument(level = "trace", skip(self, block))]
531 pub async fn stage_block_execution(
532 &self,
533 block: ProposedBlock,
534 round: Option<u32>,
535 published_blobs: Vec<Blob>,
536 ) -> Result<(Block, ChainInfoResponse), WorkerError> {
537 self.query_chain_worker(block.chain_id, move |callback| {
538 ChainWorkerRequest::StageBlockExecution {
539 block,
540 round,
541 published_blobs,
542 callback,
543 }
544 })
545 .await
546 }
547
548 #[instrument(level = "trace", skip(self, chain_id, query))]
550 pub async fn query_application(
551 &self,
552 chain_id: ChainId,
553 query: Query,
554 ) -> Result<QueryOutcome, WorkerError> {
555 self.query_chain_worker(chain_id, move |callback| {
556 ChainWorkerRequest::QueryApplication { query, callback }
557 })
558 .await
559 }
560
561 #[instrument(level = "trace", skip(self, chain_id, application_id))]
562 pub async fn describe_application(
563 &self,
564 chain_id: ChainId,
565 application_id: ApplicationId,
566 ) -> Result<ApplicationDescription, WorkerError> {
567 self.query_chain_worker(chain_id, move |callback| {
568 ChainWorkerRequest::DescribeApplication {
569 application_id,
570 callback,
571 }
572 })
573 .await
574 }
575
576 #[instrument(
578 level = "trace",
579 skip(self, certificate, notify_when_messages_are_delivered)
580 )]
581 async fn process_confirmed_block(
582 &self,
583 certificate: ConfirmedBlockCertificate,
584 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
585 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
586 let chain_id = certificate.block().header.chain_id;
587
588 let (response, actions) = self
589 .query_chain_worker(chain_id, move |callback| {
590 ChainWorkerRequest::ProcessConfirmedBlock {
591 certificate,
592 notify_when_messages_are_delivered,
593 callback,
594 }
595 })
596 .await?;
597
598 #[cfg(with_metrics)]
599 metrics::NUM_BLOCKS.with_label_values(&[]).inc();
600
601 Ok((response, actions))
602 }
603
604 #[instrument(level = "trace", skip(self, certificate))]
606 async fn process_validated_block(
607 &self,
608 certificate: ValidatedBlockCertificate,
609 ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
610 let chain_id = certificate.block().header.chain_id;
611
612 self.query_chain_worker(chain_id, move |callback| {
613 ChainWorkerRequest::ProcessValidatedBlock {
614 certificate,
615 callback,
616 }
617 })
618 .await
619 }
620
621 #[instrument(level = "trace", skip(self, certificate))]
623 async fn process_timeout(
624 &self,
625 certificate: TimeoutCertificate,
626 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
627 let chain_id = certificate.value().chain_id();
628 self.query_chain_worker(chain_id, move |callback| {
629 ChainWorkerRequest::ProcessTimeout {
630 certificate,
631 callback,
632 }
633 })
634 .await
635 }
636
637 #[instrument(level = "trace", skip(self, origin, recipient, bundles))]
638 async fn process_cross_chain_update(
639 &self,
640 origin: ChainId,
641 recipient: ChainId,
642 bundles: Vec<(Epoch, MessageBundle)>,
643 ) -> Result<Option<BlockHeight>, WorkerError> {
644 self.query_chain_worker(recipient, move |callback| {
645 ChainWorkerRequest::ProcessCrossChainUpdate {
646 origin,
647 bundles,
648 callback,
649 }
650 })
651 .await
652 }
653
654 #[instrument(level = "trace", skip(self, chain_id, height))]
656 #[cfg(with_testing)]
657 pub async fn read_certificate(
658 &self,
659 chain_id: ChainId,
660 height: BlockHeight,
661 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
662 self.query_chain_worker(chain_id, move |callback| {
663 ChainWorkerRequest::ReadCertificate { height, callback }
664 })
665 .await
666 }
667
668 #[instrument(level = "trace", skip(self))]
674 pub async fn chain_state_view(
675 &self,
676 chain_id: ChainId,
677 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
678 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
679 callback,
680 })
681 .await
682 }
683
684 #[instrument(level = "trace", skip(self, request_builder))]
685 async fn query_chain_worker<Response>(
687 &self,
688 chain_id: ChainId,
689 request_builder: impl FnOnce(
690 oneshot::Sender<Result<Response, WorkerError>>,
691 ) -> ChainWorkerRequest<StorageClient::Context>,
692 ) -> Result<Response, WorkerError> {
693 let chain_actor = self.get_chain_worker_endpoint(chain_id).await?;
694 let (callback, response) = oneshot::channel();
695
696 chain_actor
697 .send((request_builder(callback), tracing::Span::current()))
698 .expect("`ChainWorkerActor` stopped executing unexpectedly");
699
700 response
701 .await
702 .expect("`ChainWorkerActor` stopped executing without responding")
703 }
704
705 #[instrument(level = "trace", skip(self))]
708 async fn get_chain_worker_endpoint(
709 &self,
710 chain_id: ChainId,
711 ) -> Result<ChainActorEndpoint<StorageClient>, WorkerError> {
712 let (sender, new_receiver) = timeout(Duration::from_secs(3), async move {
713 loop {
714 match self.try_get_chain_worker_endpoint(chain_id) {
715 Some(endpoint) => break endpoint,
716 None => sleep(Duration::from_millis(250)).await,
717 }
718 }
719 })
720 .await
721 .map_err(|_| WorkerError::FullChainWorkerCache)?;
722
723 if let Some(receiver) = new_receiver {
724 let delivery_notifier = self
725 .delivery_notifiers
726 .lock()
727 .unwrap()
728 .entry(chain_id)
729 .or_default()
730 .clone();
731
732 let actor_task = ChainWorkerActor::run(
733 self.chain_worker_config.clone(),
734 self.storage.clone(),
735 self.block_cache.clone(),
736 self.execution_state_cache.clone(),
737 self.tracked_chains.clone(),
738 delivery_notifier,
739 chain_id,
740 receiver,
741 );
742
743 self.chain_worker_tasks
744 .lock()
745 .unwrap()
746 .spawn_task(actor_task);
747 }
748
749 Ok(sender)
750 }
751
752 #[instrument(level = "trace", skip(self))]
757 #[expect(clippy::type_complexity)]
758 fn try_get_chain_worker_endpoint(
759 &self,
760 chain_id: ChainId,
761 ) -> Option<(
762 ChainActorEndpoint<StorageClient>,
763 Option<
764 mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
765 >,
766 )> {
767 let mut chain_workers = self.chain_workers.lock().unwrap();
768
769 if let Some(endpoint) = chain_workers.get(&chain_id) {
770 Some((endpoint.clone(), None))
771 } else {
772 let (sender, receiver) = mpsc::unbounded_channel();
773 chain_workers.insert(chain_id, sender.clone());
774 Some((sender, Some(receiver)))
775 }
776 }
777
778 #[instrument(skip_all, fields(
779 nick = self.nickname,
780 chain_id = format!("{:.8}", proposal.content.block.chain_id),
781 height = %proposal.content.block.height,
782 ))]
783 pub async fn handle_block_proposal(
784 &self,
785 proposal: BlockProposal,
786 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
787 trace!("{} <-- {:?}", self.nickname, proposal);
788 #[cfg(with_metrics)]
789 let round = proposal.content.round;
790 let response = self
791 .query_chain_worker(proposal.content.block.chain_id, move |callback| {
792 ChainWorkerRequest::HandleBlockProposal { proposal, callback }
793 })
794 .await?;
795 #[cfg(with_metrics)]
796 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
797 .with_label_values(&[round.type_name()])
798 .observe(round.number() as f64);
799 Ok(response)
800 }
801
802 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
805 pub async fn handle_lite_certificate<'a>(
806 &self,
807 certificate: LiteCertificate<'a>,
808 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
809 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
810 match self.full_certificate(certificate).await? {
811 Either::Left(confirmed) => {
812 self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
813 .await
814 }
815 Either::Right(validated) => {
816 if let Some(notifier) = notify_when_messages_are_delivered {
817 if let Err(()) = notifier.send(()) {
819 warn!("Failed to notify message delivery to caller");
820 }
821 }
822 self.handle_validated_certificate(validated).await
823 }
824 }
825 }
826
827 #[instrument(skip_all, fields(
829 nick = self.nickname,
830 chain_id = format!("{:.8}", certificate.block().header.chain_id),
831 height = %certificate.block().header.height,
832 ))]
833 pub async fn handle_confirmed_certificate(
834 &self,
835 certificate: ConfirmedBlockCertificate,
836 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
837 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
838 trace!("{} <-- {:?}", self.nickname, certificate);
839 #[cfg(with_metrics)]
840 {
841 let confirmed_transactions = (certificate.block().body.incoming_bundles.len()
842 + certificate.block().body.operations.len())
843 as u64;
844
845 metrics::NUM_ROUNDS_IN_CERTIFICATE
846 .with_label_values(&[
847 certificate.inner().to_log_str(),
848 certificate.round.type_name(),
849 ])
850 .observe(certificate.round.number() as f64);
851 if confirmed_transactions > 0 {
852 metrics::TRANSACTION_COUNT
853 .with_label_values(&[])
854 .inc_by(confirmed_transactions);
855 }
856
857 for (validator_name, _) in certificate.signatures() {
858 metrics::CERTIFICATES_SIGNED
859 .with_label_values(&[&validator_name.to_string()])
860 .inc();
861 }
862 }
863
864 self.process_confirmed_block(certificate, notify_when_messages_are_delivered)
865 .await
866 }
867
868 #[instrument(skip_all, fields(
870 nick = self.nickname,
871 chain_id = format!("{:.8}", certificate.block().header.chain_id),
872 height = %certificate.block().header.height,
873 ))]
874 pub async fn handle_validated_certificate(
875 &self,
876 certificate: ValidatedBlockCertificate,
877 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
878 trace!("{} <-- {:?}", self.nickname, certificate);
879
880 #[cfg(with_metrics)]
881 let round = certificate.round;
882 #[cfg(with_metrics)]
883 let cert_str = certificate.inner().to_log_str();
884
885 let (info, actions, _duplicated) = self.process_validated_block(certificate).await?;
886 #[cfg(with_metrics)]
887 {
888 if !_duplicated {
889 metrics::NUM_ROUNDS_IN_CERTIFICATE
890 .with_label_values(&[cert_str, round.type_name()])
891 .observe(round.number() as f64);
892 }
893 }
894 Ok((info, actions))
895 }
896
897 #[instrument(skip_all, fields(
899 nick = self.nickname,
900 chain_id = format!("{:.8}", certificate.inner().chain_id()),
901 height = %certificate.inner().height(),
902 ))]
903 pub async fn handle_timeout_certificate(
904 &self,
905 certificate: TimeoutCertificate,
906 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
907 trace!("{} <-- {:?}", self.nickname, certificate);
908 self.process_timeout(certificate).await
909 }
910
911 #[instrument(skip_all, fields(
912 nick = self.nickname,
913 chain_id = format!("{:.8}", query.chain_id)
914 ))]
915 pub async fn handle_chain_info_query(
916 &self,
917 query: ChainInfoQuery,
918 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
919 trace!("{} <-- {:?}", self.nickname, query);
920 let result = self
921 .query_chain_worker(query.chain_id, move |callback| {
922 ChainWorkerRequest::HandleChainInfoQuery { query, callback }
923 })
924 .await;
925 trace!("{} --> {:?}", self.nickname, result);
926 result
927 }
928
929 #[instrument(skip_all, fields(
930 nick = self.nickname,
931 chain_id = format!("{:.8}", chain_id)
932 ))]
933 pub async fn download_pending_blob(
934 &self,
935 chain_id: ChainId,
936 blob_id: BlobId,
937 ) -> Result<Blob, WorkerError> {
938 trace!(
939 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
940 self.nickname
941 );
942 let result = self
943 .query_chain_worker(chain_id, move |callback| {
944 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
945 })
946 .await;
947 trace!(
948 "{} --> {:?}",
949 self.nickname,
950 result.as_ref().map(|_| blob_id)
951 );
952 result
953 }
954
955 #[instrument(skip_all, fields(
956 nick = self.nickname,
957 chain_id = format!("{:.8}", chain_id)
958 ))]
959 pub async fn handle_pending_blob(
960 &self,
961 chain_id: ChainId,
962 blob: Blob,
963 ) -> Result<ChainInfoResponse, WorkerError> {
964 let blob_id = blob.id();
965 trace!(
966 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
967 self.nickname
968 );
969 let result = self
970 .query_chain_worker(chain_id, move |callback| {
971 ChainWorkerRequest::HandlePendingBlob { blob, callback }
972 })
973 .await;
974 trace!(
975 "{} --> {:?}",
976 self.nickname,
977 result.as_ref().map(|_| blob_id)
978 );
979 result
980 }
981
982 #[instrument(skip_all, fields(
983 nick = self.nickname,
984 chain_id = format!("{:.8}", request.target_chain_id())
985 ))]
986 pub async fn handle_cross_chain_request(
987 &self,
988 request: CrossChainRequest,
989 ) -> Result<NetworkActions, WorkerError> {
990 trace!("{} <-- {:?}", self.nickname, request);
991 match request {
992 CrossChainRequest::UpdateRecipient {
993 sender,
994 recipient,
995 bundles,
996 } => {
997 let mut actions = NetworkActions::default();
998 let origin = sender;
999 let Some(height) = self
1000 .process_cross_chain_update(origin, recipient, bundles)
1001 .await?
1002 else {
1003 return Ok(actions);
1004 };
1005 actions.notifications.push(Notification {
1006 chain_id: recipient,
1007 reason: Reason::NewIncomingBundle { origin, height },
1008 });
1009 actions
1010 .cross_chain_requests
1011 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1012 sender,
1013 recipient,
1014 latest_height: height,
1015 });
1016 Ok(actions)
1017 }
1018 CrossChainRequest::ConfirmUpdatedRecipient {
1019 sender,
1020 recipient,
1021 latest_height,
1022 } => {
1023 self.query_chain_worker(sender, move |callback| {
1024 ChainWorkerRequest::ConfirmUpdatedRecipient {
1025 recipient,
1026 latest_height,
1027 callback,
1028 }
1029 })
1030 .await?;
1031 Ok(NetworkActions::default())
1032 }
1033 }
1034 }
1035
1036 pub async fn update_received_certificate_trackers(
1038 &self,
1039 chain_id: ChainId,
1040 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1041 ) -> Result<(), WorkerError> {
1042 self.query_chain_worker(chain_id, move |callback| {
1043 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1044 new_trackers,
1045 callback,
1046 }
1047 })
1048 .await
1049 }
1050}
1051
1052#[cfg(with_testing)]
1053impl<StorageClient> WorkerState<StorageClient>
1054where
1055 StorageClient: Storage,
1056{
1057 #[instrument(level = "trace", skip(self))]
1063 pub fn public_key(&self) -> ValidatorPublicKey {
1064 self.chain_worker_config
1065 .key_pair()
1066 .expect(
1067 "Test validator should have a key pair assigned to it \
1068 in order to obtain it's public key",
1069 )
1070 .public()
1071 }
1072}