1use std::{
6 collections::{BTreeMap, HashMap, HashSet, VecDeque},
7 num::NonZeroUsize,
8 sync::{Arc, Mutex, RwLock},
9 time::Duration,
10};
11
12use futures::future::Either;
13use linera_base::{
14 crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
15 data_types::{
16 ApplicationDescription, ArithmeticError, Blob, BlockHeight, DecompressionError, Epoch,
17 Round,
18 },
19 doc_scalar,
20 hashed::Hashed,
21 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId},
22 time::timer::{sleep, timeout},
23};
24#[cfg(with_testing)]
25use linera_chain::ChainExecutionContext;
26use linera_chain::{
27 data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
28 types::{
29 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
30 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
31 },
32 ChainError, ChainStateView,
33};
34use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome};
35use linera_storage::Storage;
36use linera_views::ViewError;
37use lru::LruCache;
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
41use tracing::{error, instrument, trace, warn};
42
43use crate::{
44 chain_worker::{ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier},
45 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
46 join_set_ext::{JoinSet, JoinSetExt},
47 notifier::Notifier,
48 value_cache::ValueCache,
49};
50
51#[cfg(test)]
52#[path = "unit_tests/worker_tests.rs"]
53mod worker_tests;
54
55#[cfg(with_metrics)]
56mod metrics {
57 use std::sync::LazyLock;
58
59 use linera_base::prometheus_util::{
60 exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
61 };
62 use prometheus::{HistogramVec, IntCounterVec};
63
64 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
65 register_histogram_vec(
66 "num_rounds_in_certificate",
67 "Number of rounds in certificate",
68 &["certificate_value", "round_type"],
69 exponential_bucket_interval(0.1, 50.0),
70 )
71 });
72
73 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
74 register_histogram_vec(
75 "num_rounds_in_block_proposal",
76 "Number of rounds in block proposal",
77 &["round_type"],
78 exponential_bucket_interval(0.1, 50.0),
79 )
80 });
81
82 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
83 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
84
85 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
86 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
87 });
88
89 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
90 register_int_counter_vec(
91 "certificates_signed",
92 "Number of confirmed block certificates signed by each validator",
93 &["validator_name"],
94 )
95 });
96}
97
98#[derive(Default, Debug)]
100pub struct NetworkActions {
101 pub cross_chain_requests: Vec<CrossChainRequest>,
103 pub notifications: Vec<Notification>,
105}
106
107impl NetworkActions {
108 pub fn extend(&mut self, other: NetworkActions) {
109 self.cross_chain_requests.extend(other.cross_chain_requests);
110 self.notifications.extend(other.notifications);
111 }
112}
113
114#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
115pub struct Notification {
117 pub chain_id: ChainId,
118 pub reason: Reason,
119}
120
121doc_scalar!(
122 Notification,
123 "Notify that a chain has a new certified block or a new message"
124);
125
126#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
127pub enum Reason {
129 NewBlock {
130 height: BlockHeight,
131 hash: CryptoHash,
132 },
133 NewIncomingBundle {
134 origin: ChainId,
135 height: BlockHeight,
136 },
137 NewRound {
138 height: BlockHeight,
139 round: Round,
140 },
141}
142
143#[derive(Debug, Error)]
145pub enum WorkerError {
146 #[error(transparent)]
147 CryptoError(#[from] CryptoError),
148
149 #[error(transparent)]
150 ArithmeticError(#[from] ArithmeticError),
151
152 #[error(transparent)]
153 ViewError(#[from] ViewError),
154
155 #[error(transparent)]
156 ChainError(#[from] Box<ChainError>),
157
158 #[error("Block was not signed by an authorized owner")]
160 InvalidOwner,
161
162 #[error("Operations in the block are not authenticated by the proper signer: {0}")]
163 InvalidSigner(AccountOwner),
164
165 #[error(
167 "Was expecting block height {expected_block_height} but found {found_block_height} instead"
168 )]
169 UnexpectedBlockHeight {
170 expected_block_height: BlockHeight,
171 found_block_height: BlockHeight,
172 },
173 #[error("Cannot confirm a block before its predecessors: {current_block_height:?}")]
174 MissingEarlierBlocks { current_block_height: BlockHeight },
175 #[error("Unexpected epoch {epoch:}: chain {chain_id:} is at {chain_epoch:}")]
176 InvalidEpoch {
177 chain_id: ChainId,
178 chain_epoch: Epoch,
179 epoch: Epoch,
180 },
181
182 #[error("Invalid cross-chain request")]
184 InvalidCrossChainRequest,
185 #[error("The block does not contain the hash that we expected for the previous block")]
186 InvalidBlockChaining,
187 #[error(
188 "The given outcome is not what we computed after executing the block.\n\
189 Computed: {computed:#?}\n\
190 Submitted: {submitted:#?}"
191 )]
192 IncorrectOutcome {
193 computed: Box<BlockExecutionOutcome>,
194 submitted: Box<BlockExecutionOutcome>,
195 },
196 #[error("The timestamp of a Tick operation is in the future.")]
197 InvalidTimestamp,
198 #[error("We don't have the value for the certificate.")]
199 MissingCertificateValue,
200 #[error("The hash certificate doesn't match its value.")]
201 InvalidLiteCertificate,
202 #[error("Fast blocks cannot query oracles")]
203 FastBlockUsingOracles,
204 #[error("Blobs not found: {0:?}")]
205 BlobsNotFound(Vec<BlobId>),
206 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
207 ConfirmedLogEntryNotFound {
208 height: BlockHeight,
209 chain_id: ChainId,
210 },
211 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
212 PreprocessedBlocksEntryNotFound {
213 height: BlockHeight,
214 chain_id: ChainId,
215 },
216 #[error("The block proposal is invalid: {0}")]
217 InvalidBlockProposal(String),
218 #[error("The worker is too busy to handle new chains")]
219 FullChainWorkerCache,
220 #[error("Failed to join spawned worker task")]
221 JoinError,
222 #[error("Blob was not required by any pending block")]
223 UnexpectedBlob,
224 #[error("Number of published blobs per block must not exceed {0}")]
225 TooManyPublishedBlobs(u64),
226 #[error(transparent)]
227 Decompression(#[from] DecompressionError),
228}
229
230impl From<ChainError> for WorkerError {
231 #[instrument(level = "trace", skip(chain_error))]
232 fn from(chain_error: ChainError) -> Self {
233 match chain_error {
234 ChainError::ExecutionError(execution_error, context) => {
235 if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
236 Self::BlobsNotFound(blob_ids)
237 } else {
238 Self::ChainError(Box::new(ChainError::ExecutionError(
239 execution_error,
240 context,
241 )))
242 }
243 }
244 error => Self::ChainError(Box::new(error)),
245 }
246 }
247}
248
249#[cfg(with_testing)]
250impl WorkerError {
251 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
257 let WorkerError::ChainError(chain_error) = self else {
258 panic!("Expected an `ExecutionError`. Got: {self:#?}");
259 };
260
261 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
262 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
263 };
264
265 assert_eq!(context, expected_context);
266
267 *execution_error
268 }
269}
270
271pub struct WorkerState<StorageClient>
273where
274 StorageClient: Storage,
275{
276 nickname: String,
278 storage: StorageClient,
280 chain_worker_config: ChainWorkerConfig,
282 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
283 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
284 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
286 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
289 chain_worker_tasks: Arc<Mutex<JoinSet>>,
291 chain_workers: Arc<Mutex<LruCache<ChainId, ChainActorEndpoint<StorageClient>>>>,
293}
294
295impl<StorageClient> Clone for WorkerState<StorageClient>
296where
297 StorageClient: Storage + Clone,
298{
299 fn clone(&self) -> Self {
300 WorkerState {
301 nickname: self.nickname.clone(),
302 storage: self.storage.clone(),
303 chain_worker_config: self.chain_worker_config.clone(),
304 block_cache: self.block_cache.clone(),
305 execution_state_cache: self.execution_state_cache.clone(),
306 tracked_chains: self.tracked_chains.clone(),
307 delivery_notifiers: self.delivery_notifiers.clone(),
308 chain_worker_tasks: self.chain_worker_tasks.clone(),
309 chain_workers: self.chain_workers.clone(),
310 }
311 }
312}
313
314type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
316 ChainWorkerRequest<<StorageClient as Storage>::Context>,
317 tracing::Span,
318)>;
319
320pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
321
322impl<StorageClient> WorkerState<StorageClient>
323where
324 StorageClient: Storage,
325{
326 #[instrument(level = "trace", skip(nickname, key_pair, storage))]
327 pub fn new(
328 nickname: String,
329 key_pair: Option<ValidatorSecretKey>,
330 storage: StorageClient,
331 chain_worker_limit: NonZeroUsize,
332 ) -> Self {
333 WorkerState {
334 nickname,
335 storage,
336 chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
337 block_cache: Arc::new(ValueCache::default()),
338 execution_state_cache: Arc::new(ValueCache::default()),
339 tracked_chains: None,
340 delivery_notifiers: Arc::default(),
341 chain_worker_tasks: Arc::default(),
342 chain_workers: Arc::new(Mutex::new(LruCache::new(chain_worker_limit))),
343 }
344 }
345
346 #[instrument(level = "trace", skip(nickname, storage))]
347 pub fn new_for_client(
348 nickname: String,
349 storage: StorageClient,
350 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
351 chain_worker_limit: NonZeroUsize,
352 ) -> Self {
353 WorkerState {
354 nickname,
355 storage,
356 chain_worker_config: ChainWorkerConfig::default(),
357 block_cache: Arc::new(ValueCache::default()),
358 execution_state_cache: Arc::new(ValueCache::default()),
359 tracked_chains: Some(tracked_chains),
360 delivery_notifiers: Arc::default(),
361 chain_worker_tasks: Arc::default(),
362 chain_workers: Arc::new(Mutex::new(LruCache::new(chain_worker_limit))),
363 }
364 }
365
366 #[instrument(level = "trace", skip(self, value))]
367 pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
368 self.chain_worker_config.allow_inactive_chains = value;
369 self
370 }
371
372 #[instrument(level = "trace", skip(self, value))]
373 pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
374 self.chain_worker_config
375 .allow_messages_from_deprecated_epochs = value;
376 self
377 }
378
379 #[instrument(level = "trace", skip(self, value))]
380 pub fn with_long_lived_services(mut self, value: bool) -> Self {
381 self.chain_worker_config.long_lived_services = value;
382 self
383 }
384
385 #[instrument(level = "trace", skip(self, grace_period))]
390 pub fn with_grace_period(mut self, grace_period: Duration) -> Self {
391 self.chain_worker_config.grace_period = grace_period;
392 self
393 }
394
395 #[instrument(level = "trace", skip(self))]
396 pub fn nickname(&self) -> &str {
397 &self.nickname
398 }
399
400 #[instrument(level = "trace", skip(self))]
402 #[cfg(not(feature = "test"))]
403 pub(crate) fn storage_client(&self) -> &StorageClient {
404 &self.storage
405 }
406
407 #[instrument(level = "trace", skip(self))]
410 #[cfg(feature = "test")]
411 pub fn storage_client(&self) -> &StorageClient {
412 &self.storage
413 }
414
415 #[instrument(level = "trace", skip(self, key_pair))]
416 #[cfg(test)]
417 pub(crate) async fn with_key_pair(mut self, key_pair: Option<Arc<ValidatorSecretKey>>) -> Self {
418 self.chain_worker_config.key_pair = key_pair;
419 self.chain_workers.lock().unwrap().clear();
420 self
421 }
422
423 #[instrument(level = "trace", skip(self, certificate))]
424 pub(crate) async fn full_certificate(
425 &self,
426 certificate: LiteCertificate<'_>,
427 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
428 let block = self
429 .block_cache
430 .get(&certificate.value.value_hash)
431 .ok_or(WorkerError::MissingCertificateValue)?;
432
433 match certificate.value.kind {
434 linera_chain::types::CertificateKind::Confirmed => {
435 let value = ConfirmedBlock::from_hashed(block);
436 Ok(Either::Left(
437 certificate
438 .with_value(value)
439 .ok_or(WorkerError::InvalidLiteCertificate)?,
440 ))
441 }
442 linera_chain::types::CertificateKind::Validated => {
443 let value = ValidatedBlock::from_hashed(block);
444 Ok(Either::Right(
445 certificate
446 .with_value(value)
447 .ok_or(WorkerError::InvalidLiteCertificate)?,
448 ))
449 }
450 _ => return Err(WorkerError::InvalidLiteCertificate),
451 }
452 }
453}
454
455#[allow(async_fn_in_trait)]
456#[cfg_attr(not(web), trait_variant::make(Send))]
457pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
458 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
459 worker: &WorkerState<S>,
460 certificate: GenericCertificate<Self>,
461 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
462}
463
464impl ProcessableCertificate for ConfirmedBlock {
465 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
466 worker: &WorkerState<S>,
467 certificate: ConfirmedBlockCertificate,
468 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
469 worker.handle_confirmed_certificate(certificate, None).await
470 }
471}
472
473impl ProcessableCertificate for ValidatedBlock {
474 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
475 worker: &WorkerState<S>,
476 certificate: ValidatedBlockCertificate,
477 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
478 worker.handle_validated_certificate(certificate).await
479 }
480}
481
482impl ProcessableCertificate for Timeout {
483 async fn process_certificate<S: Storage + Clone + Send + Sync + 'static>(
484 worker: &WorkerState<S>,
485 certificate: TimeoutCertificate,
486 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
487 worker.handle_timeout_certificate(certificate).await
488 }
489}
490
491impl<StorageClient> WorkerState<StorageClient>
492where
493 StorageClient: Storage + Clone + Send + Sync + 'static,
494{
495 #[instrument(level = "trace", skip(self, certificate, notifier))]
496 #[inline]
497 pub async fn fully_handle_certificate_with_notifications<T>(
498 &self,
499 certificate: GenericCertificate<T>,
500 notifier: &impl Notifier,
501 ) -> Result<ChainInfoResponse, WorkerError>
502 where
503 T: ProcessableCertificate,
504 {
505 let notifications = (*notifier).clone();
506 let this = self.clone();
507 linera_base::task::spawn(async move {
508 let (response, actions) =
509 ProcessableCertificate::process_certificate(&this, certificate).await?;
510 notifications.notify(&actions.notifications);
511 let mut requests = VecDeque::from(actions.cross_chain_requests);
512 while let Some(request) = requests.pop_front() {
513 let actions = this.handle_cross_chain_request(request).await?;
514 requests.extend(actions.cross_chain_requests);
515 notifications.notify(&actions.notifications);
516 }
517 Ok(response)
518 })
519 .await
520 .unwrap_or_else(|_| Err(WorkerError::JoinError))
521 }
522
523 #[instrument(level = "trace", skip(self, block))]
525 pub async fn stage_block_execution(
526 &self,
527 block: ProposedBlock,
528 round: Option<u32>,
529 published_blobs: Vec<Blob>,
530 ) -> Result<(Block, ChainInfoResponse), WorkerError> {
531 self.query_chain_worker(block.chain_id, move |callback| {
532 ChainWorkerRequest::StageBlockExecution {
533 block,
534 round,
535 published_blobs,
536 callback,
537 }
538 })
539 .await
540 }
541
542 #[instrument(level = "trace", skip(self, chain_id, query))]
544 pub async fn query_application(
545 &self,
546 chain_id: ChainId,
547 query: Query,
548 ) -> Result<QueryOutcome, WorkerError> {
549 self.query_chain_worker(chain_id, move |callback| {
550 ChainWorkerRequest::QueryApplication { query, callback }
551 })
552 .await
553 }
554
555 #[instrument(level = "trace", skip(self, chain_id, application_id))]
556 pub async fn describe_application(
557 &self,
558 chain_id: ChainId,
559 application_id: ApplicationId,
560 ) -> Result<ApplicationDescription, WorkerError> {
561 self.query_chain_worker(chain_id, move |callback| {
562 ChainWorkerRequest::DescribeApplication {
563 application_id,
564 callback,
565 }
566 })
567 .await
568 }
569
570 #[instrument(
572 level = "trace",
573 skip(self, certificate, notify_when_messages_are_delivered)
574 )]
575 async fn process_confirmed_block(
576 &self,
577 certificate: ConfirmedBlockCertificate,
578 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
579 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
580 let chain_id = certificate.block().header.chain_id;
581
582 let (response, actions) = self
583 .query_chain_worker(chain_id, move |callback| {
584 ChainWorkerRequest::ProcessConfirmedBlock {
585 certificate,
586 notify_when_messages_are_delivered,
587 callback,
588 }
589 })
590 .await?;
591
592 #[cfg(with_metrics)]
593 metrics::NUM_BLOCKS.with_label_values(&[]).inc();
594
595 Ok((response, actions))
596 }
597
598 #[instrument(level = "trace", skip(self, certificate))]
600 async fn process_validated_block(
601 &self,
602 certificate: ValidatedBlockCertificate,
603 ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
604 let chain_id = certificate.block().header.chain_id;
605
606 self.query_chain_worker(chain_id, move |callback| {
607 ChainWorkerRequest::ProcessValidatedBlock {
608 certificate,
609 callback,
610 }
611 })
612 .await
613 }
614
615 #[instrument(level = "trace", skip(self, certificate))]
617 async fn process_timeout(
618 &self,
619 certificate: TimeoutCertificate,
620 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
621 let chain_id = certificate.value().chain_id();
622 self.query_chain_worker(chain_id, move |callback| {
623 ChainWorkerRequest::ProcessTimeout {
624 certificate,
625 callback,
626 }
627 })
628 .await
629 }
630
631 #[instrument(level = "trace", skip(self, origin, recipient, bundles))]
632 async fn process_cross_chain_update(
633 &self,
634 origin: ChainId,
635 recipient: ChainId,
636 bundles: Vec<(Epoch, MessageBundle)>,
637 ) -> Result<Option<BlockHeight>, WorkerError> {
638 self.query_chain_worker(recipient, move |callback| {
639 ChainWorkerRequest::ProcessCrossChainUpdate {
640 origin,
641 bundles,
642 callback,
643 }
644 })
645 .await
646 }
647
648 #[instrument(level = "trace", skip(self, chain_id, height))]
650 #[cfg(with_testing)]
651 pub async fn read_certificate(
652 &self,
653 chain_id: ChainId,
654 height: BlockHeight,
655 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
656 self.query_chain_worker(chain_id, move |callback| {
657 ChainWorkerRequest::ReadCertificate { height, callback }
658 })
659 .await
660 }
661
662 #[instrument(level = "trace", skip(self))]
668 pub async fn chain_state_view(
669 &self,
670 chain_id: ChainId,
671 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
672 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
673 callback,
674 })
675 .await
676 }
677
678 #[instrument(level = "trace", skip(self, request_builder))]
679 async fn query_chain_worker<Response>(
681 &self,
682 chain_id: ChainId,
683 request_builder: impl FnOnce(
684 oneshot::Sender<Result<Response, WorkerError>>,
685 ) -> ChainWorkerRequest<StorageClient::Context>,
686 ) -> Result<Response, WorkerError> {
687 let chain_actor = self.get_chain_worker_endpoint(chain_id).await?;
688 let (callback, response) = oneshot::channel();
689
690 chain_actor
691 .send((request_builder(callback), tracing::Span::current()))
692 .expect("`ChainWorkerActor` stopped executing unexpectedly");
693
694 response
695 .await
696 .expect("`ChainWorkerActor` stopped executing without responding")
697 }
698
699 #[instrument(level = "trace", skip(self))]
702 async fn get_chain_worker_endpoint(
703 &self,
704 chain_id: ChainId,
705 ) -> Result<ChainActorEndpoint<StorageClient>, WorkerError> {
706 let (sender, new_receiver) = timeout(Duration::from_secs(3), async move {
707 loop {
708 match self.try_get_chain_worker_endpoint(chain_id) {
709 Some(endpoint) => break endpoint,
710 None => sleep(Duration::from_millis(250)).await,
711 }
712 warn!("No chain worker candidates found for eviction, retrying...");
713 }
714 })
715 .await
716 .map_err(|_| WorkerError::FullChainWorkerCache)?;
717
718 if let Some(receiver) = new_receiver {
719 let delivery_notifier = self
720 .delivery_notifiers
721 .lock()
722 .unwrap()
723 .entry(chain_id)
724 .or_default()
725 .clone();
726
727 let actor_task = ChainWorkerActor::run(
728 self.chain_worker_config.clone(),
729 self.storage.clone(),
730 self.block_cache.clone(),
731 self.execution_state_cache.clone(),
732 self.tracked_chains.clone(),
733 delivery_notifier,
734 chain_id,
735 receiver,
736 );
737
738 self.chain_worker_tasks
739 .lock()
740 .unwrap()
741 .spawn_task(actor_task);
742 }
743
744 Ok(sender)
745 }
746
747 #[instrument(level = "trace", skip(self))]
752 #[expect(clippy::type_complexity)]
753 fn try_get_chain_worker_endpoint(
754 &self,
755 chain_id: ChainId,
756 ) -> Option<(
757 ChainActorEndpoint<StorageClient>,
758 Option<
759 mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
760 >,
761 )> {
762 let mut chain_workers = self.chain_workers.lock().unwrap();
763
764 if let Some(endpoint) = chain_workers.get(&chain_id) {
765 Some((endpoint.clone(), None))
766 } else {
767 if chain_workers.len() >= usize::from(chain_workers.cap()) {
768 let (chain_to_evict, _) = chain_workers
769 .iter()
770 .rev()
771 .find(|(_, candidate_endpoint)| candidate_endpoint.strong_count() <= 1)?;
772 let chain_to_evict = *chain_to_evict;
773
774 chain_workers.pop(&chain_to_evict);
775 self.clean_up_finished_chain_workers(&chain_workers);
776 }
777
778 let (sender, receiver) = mpsc::unbounded_channel();
779 chain_workers.push(chain_id, sender.clone());
780
781 Some((sender, Some(receiver)))
782 }
783 }
784
785 fn clean_up_finished_chain_workers(
787 &self,
788 active_chain_workers: &LruCache<ChainId, ChainActorEndpoint<StorageClient>>,
789 ) {
790 self.chain_worker_tasks
791 .lock()
792 .unwrap()
793 .reap_finished_tasks();
794
795 self.delivery_notifiers
796 .lock()
797 .unwrap()
798 .retain(|chain_id, notifier| {
799 !notifier.is_empty() || active_chain_workers.contains(chain_id)
800 });
801 }
802
803 #[instrument(skip_all, fields(
804 nick = self.nickname,
805 chain_id = format!("{:.8}", proposal.content.block.chain_id),
806 height = %proposal.content.block.height,
807 ))]
808 pub async fn handle_block_proposal(
809 &self,
810 proposal: BlockProposal,
811 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
812 trace!("{} <-- {:?}", self.nickname, proposal);
813 #[cfg(with_metrics)]
814 let round = proposal.content.round;
815 let response = self
816 .query_chain_worker(proposal.content.block.chain_id, move |callback| {
817 ChainWorkerRequest::HandleBlockProposal { proposal, callback }
818 })
819 .await?;
820 #[cfg(with_metrics)]
821 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
822 .with_label_values(&[round.type_name()])
823 .observe(round.number() as f64);
824 Ok(response)
825 }
826
827 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
830 pub async fn handle_lite_certificate<'a>(
831 &self,
832 certificate: LiteCertificate<'a>,
833 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
834 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
835 match self.full_certificate(certificate).await? {
836 Either::Left(confirmed) => {
837 self.handle_confirmed_certificate(confirmed, notify_when_messages_are_delivered)
838 .await
839 }
840 Either::Right(validated) => {
841 if let Some(notifier) = notify_when_messages_are_delivered {
842 if let Err(()) = notifier.send(()) {
844 warn!("Failed to notify message delivery to caller");
845 }
846 }
847 self.handle_validated_certificate(validated).await
848 }
849 }
850 }
851
852 #[instrument(skip_all, fields(
854 nick = self.nickname,
855 chain_id = format!("{:.8}", certificate.block().header.chain_id),
856 height = %certificate.block().header.height,
857 ))]
858 pub async fn handle_confirmed_certificate(
859 &self,
860 certificate: ConfirmedBlockCertificate,
861 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
862 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
863 trace!("{} <-- {:?}", self.nickname, certificate);
864 #[cfg(with_metrics)]
865 {
866 let confirmed_transactions = (certificate.block().body.incoming_bundles.len()
867 + certificate.block().body.operations.len())
868 as u64;
869
870 metrics::NUM_ROUNDS_IN_CERTIFICATE
871 .with_label_values(&[
872 certificate.inner().to_log_str(),
873 certificate.round.type_name(),
874 ])
875 .observe(certificate.round.number() as f64);
876 if confirmed_transactions > 0 {
877 metrics::TRANSACTION_COUNT
878 .with_label_values(&[])
879 .inc_by(confirmed_transactions);
880 }
881
882 for (validator_name, _) in certificate.signatures() {
883 metrics::CERTIFICATES_SIGNED
884 .with_label_values(&[&validator_name.to_string()])
885 .inc();
886 }
887 }
888
889 self.process_confirmed_block(certificate, notify_when_messages_are_delivered)
890 .await
891 }
892
893 #[instrument(skip_all, fields(
897 nick = self.nickname,
898 chain_id = format!("{:.8}", certificate.block().header.chain_id),
899 height = %certificate.block().header.height,
900 ))]
901 pub async fn fully_preprocess_certificate_with_notifications(
902 &self,
903 certificate: ConfirmedBlockCertificate,
904 notifier: &impl Notifier,
905 ) -> Result<(), WorkerError> {
906 trace!("{} <-- {:?} (preprocess)", self.nickname, certificate);
907
908 let notifications = (*notifier).clone();
909 let this = self.clone();
910 linera_base::task::spawn(async move {
911 let actions = this
912 .query_chain_worker(certificate.block().header.chain_id, move |callback| {
913 ChainWorkerRequest::PreprocessCertificate {
914 certificate,
915 callback,
916 }
917 })
918 .await?;
919 notifications.notify(&actions.notifications);
920 let mut requests = VecDeque::from(actions.cross_chain_requests);
921 while let Some(request) = requests.pop_front() {
922 let actions = this.handle_cross_chain_request(request).await?;
923 requests.extend(actions.cross_chain_requests);
924 notifications.notify(&actions.notifications);
925 }
926 Ok(())
927 })
928 .await
929 .unwrap_or_else(|_| Err(WorkerError::JoinError))
930 }
931
932 #[instrument(skip_all, fields(
934 nick = self.nickname,
935 chain_id = format!("{:.8}", certificate.block().header.chain_id),
936 height = %certificate.block().header.height,
937 ))]
938 pub async fn handle_validated_certificate(
939 &self,
940 certificate: ValidatedBlockCertificate,
941 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
942 trace!("{} <-- {:?}", self.nickname, certificate);
943
944 #[cfg(with_metrics)]
945 let round = certificate.round;
946 #[cfg(with_metrics)]
947 let cert_str = certificate.inner().to_log_str();
948
949 let (info, actions, _duplicated) = self.process_validated_block(certificate).await?;
950 #[cfg(with_metrics)]
951 {
952 if !_duplicated {
953 metrics::NUM_ROUNDS_IN_CERTIFICATE
954 .with_label_values(&[cert_str, round.type_name()])
955 .observe(round.number() as f64);
956 }
957 }
958 Ok((info, actions))
959 }
960
961 #[instrument(skip_all, fields(
963 nick = self.nickname,
964 chain_id = format!("{:.8}", certificate.inner().chain_id()),
965 height = %certificate.inner().height(),
966 ))]
967 pub async fn handle_timeout_certificate(
968 &self,
969 certificate: TimeoutCertificate,
970 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
971 trace!("{} <-- {:?}", self.nickname, certificate);
972 self.process_timeout(certificate).await
973 }
974
975 #[instrument(skip_all, fields(
976 nick = self.nickname,
977 chain_id = format!("{:.8}", query.chain_id)
978 ))]
979 pub async fn handle_chain_info_query(
980 &self,
981 query: ChainInfoQuery,
982 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
983 trace!("{} <-- {:?}", self.nickname, query);
984 let result = self
985 .query_chain_worker(query.chain_id, move |callback| {
986 ChainWorkerRequest::HandleChainInfoQuery { query, callback }
987 })
988 .await;
989 trace!("{} --> {:?}", self.nickname, result);
990 result
991 }
992
993 #[instrument(skip_all, fields(
994 nick = self.nickname,
995 chain_id = format!("{:.8}", chain_id)
996 ))]
997 pub async fn download_pending_blob(
998 &self,
999 chain_id: ChainId,
1000 blob_id: BlobId,
1001 ) -> Result<Blob, WorkerError> {
1002 trace!(
1003 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1004 self.nickname
1005 );
1006 let result = self
1007 .query_chain_worker(chain_id, move |callback| {
1008 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1009 })
1010 .await;
1011 trace!(
1012 "{} --> {:?}",
1013 self.nickname,
1014 result.as_ref().map(|_| blob_id)
1015 );
1016 result
1017 }
1018
1019 #[instrument(skip_all, fields(
1020 nick = self.nickname,
1021 chain_id = format!("{:.8}", chain_id)
1022 ))]
1023 pub async fn handle_pending_blob(
1024 &self,
1025 chain_id: ChainId,
1026 blob: Blob,
1027 ) -> Result<ChainInfoResponse, WorkerError> {
1028 let blob_id = blob.id();
1029 trace!(
1030 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1031 self.nickname
1032 );
1033 let result = self
1034 .query_chain_worker(chain_id, move |callback| {
1035 ChainWorkerRequest::HandlePendingBlob { blob, callback }
1036 })
1037 .await;
1038 trace!(
1039 "{} --> {:?}",
1040 self.nickname,
1041 result.as_ref().map(|_| blob_id)
1042 );
1043 result
1044 }
1045
1046 #[instrument(skip_all, fields(
1047 nick = self.nickname,
1048 chain_id = format!("{:.8}", request.target_chain_id())
1049 ))]
1050 pub async fn handle_cross_chain_request(
1051 &self,
1052 request: CrossChainRequest,
1053 ) -> Result<NetworkActions, WorkerError> {
1054 trace!("{} <-- {:?}", self.nickname, request);
1055 match request {
1056 CrossChainRequest::UpdateRecipient {
1057 sender,
1058 recipient,
1059 bundles,
1060 } => {
1061 let mut actions = NetworkActions::default();
1062 let origin = sender;
1063 let Some(height) = self
1064 .process_cross_chain_update(origin, recipient, bundles)
1065 .await?
1066 else {
1067 return Ok(actions);
1068 };
1069 actions.notifications.push(Notification {
1070 chain_id: recipient,
1071 reason: Reason::NewIncomingBundle { origin, height },
1072 });
1073 actions
1074 .cross_chain_requests
1075 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1076 sender,
1077 recipient,
1078 latest_height: height,
1079 });
1080 Ok(actions)
1081 }
1082 CrossChainRequest::ConfirmUpdatedRecipient {
1083 sender,
1084 recipient,
1085 latest_height,
1086 } => {
1087 self.query_chain_worker(sender, move |callback| {
1088 ChainWorkerRequest::ConfirmUpdatedRecipient {
1089 recipient,
1090 latest_height,
1091 callback,
1092 }
1093 })
1094 .await?;
1095 Ok(NetworkActions::default())
1096 }
1097 }
1098 }
1099
1100 pub async fn update_received_certificate_trackers(
1102 &self,
1103 chain_id: ChainId,
1104 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1105 ) -> Result<(), WorkerError> {
1106 self.query_chain_worker(chain_id, move |callback| {
1107 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1108 new_trackers,
1109 callback,
1110 }
1111 })
1112 .await
1113 }
1114}
1115
1116#[cfg(with_testing)]
1117impl<StorageClient> WorkerState<StorageClient>
1118where
1119 StorageClient: Storage,
1120{
1121 #[instrument(level = "trace", skip(self))]
1127 pub fn public_key(&self) -> ValidatorPublicKey {
1128 self.chain_worker_config
1129 .key_pair()
1130 .expect(
1131 "Test validator should have a key pair assigned to it \
1132 in order to obtain it's public key",
1133 )
1134 .public()
1135 }
1136}