1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7 future::Future,
8 pin,
9 sync::{Arc, Mutex, RwLock},
10 time::Duration,
11};
12
13use futures::{
14 future::{self, Either, Shared, WeakShared},
15 FutureExt as _,
16};
17use linera_base::{
18 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
19 data_types::{
20 ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta,
21 Timestamp,
22 },
23 doc_scalar,
24 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
25};
26use linera_cache::{Arc as CacheArc, UniqueValueCache, ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
27#[cfg(with_testing)]
28use linera_chain::ChainExecutionContext;
29use linera_chain::{
30 data_types::{BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock},
31 types::{
32 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
33 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
34 },
35 ChainError, ChainStateView,
36};
37use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
38use linera_storage::{Clock as _, Storage};
39use linera_views::{context::InactiveContext, ViewError};
40use serde::{Deserialize, Serialize};
41use thiserror::Error;
42use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
43use tracing::{debug, instrument, trace, warn};
44
45pub struct ChainStateViewReadGuard<S: Storage>(
52 OwnedRwLockReadGuard<ChainWorkerState<S>, ChainStateView<S::Context>>,
53);
54
55impl<S: Storage> std::ops::Deref for ChainStateViewReadGuard<S> {
56 type Target = ChainStateView<S::Context>;
57
58 fn deref(&self) -> &Self::Target {
59 &self.0
60 }
61}
62
63pub(crate) use crate::chain_worker::EventSubscriptionsResult;
65use crate::{
66 chain_worker::{
67 handle,
68 state::{send_result, ChainWorkerState},
69 BlockOutcome, ChainWorkerConfig, CrossChainUpdateResult, DeliveryNotifier,
70 ProcessConfirmedBlockMode,
71 },
72 client::{ChainModes, ListeningMode},
73 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
74 notifier::Notifier,
75};
76
77pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000;
78pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
79
80#[cfg(test)]
81#[path = "unit_tests/worker_tests.rs"]
82mod worker_tests;
83
84#[cfg(all(test, feature = "rocksdb"))]
85#[path = "unit_tests/worker_backup_tests.rs"]
86mod worker_backup_tests;
87
88#[cfg(not(web))]
91pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> sync_wrapper::SyncFuture<F> {
92 sync_wrapper::SyncFuture::new(f)
93}
94
95#[cfg(web)]
98pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> F {
99 f
100}
101
102#[cfg(with_metrics)]
103mod metrics {
104 use std::sync::LazyLock;
105
106 use linera_base::prometheus_util::{
107 exponential_bucket_interval, register_histogram, register_histogram_vec,
108 register_int_counter, register_int_counter_vec,
109 };
110 use linera_chain::{data_types::MessageAction, types::ConfirmedBlockCertificate};
111 use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
112
113 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
114 register_histogram_vec(
115 "num_rounds_in_certificate",
116 "Number of rounds in certificate",
117 &["certificate_value", "round_type"],
118 exponential_bucket_interval(0.1, 50.0),
119 )
120 });
121
122 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
123 register_histogram_vec(
124 "num_rounds_in_block_proposal",
125 "Number of rounds in block proposal",
126 &["round_type"],
127 exponential_bucket_interval(0.1, 50.0),
128 )
129 });
130
131 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
132 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
133
134 pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
135 LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
136
137 pub static REJECTED_BUNDLE_COUNT: LazyLock<IntCounter> =
138 LazyLock::new(|| register_int_counter("rejected_bundle_count", "Rejected bundle count"));
139
140 pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
141 LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
142
143 pub static OPERATION_COUNT: LazyLock<IntCounter> =
144 LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
145
146 pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
147 register_histogram(
148 "operations_per_block",
149 "Number of operations per block",
150 exponential_bucket_interval(1.0, 10000.0),
151 )
152 });
153
154 pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
155 register_histogram(
156 "incoming_bundles_per_block",
157 "Number of incoming bundles per block",
158 exponential_bucket_interval(1.0, 10000.0),
159 )
160 });
161
162 pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
163 register_histogram(
164 "transactions_per_block",
165 "Number of transactions per block",
166 exponential_bucket_interval(1.0, 10000.0),
167 )
168 });
169
170 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
171 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
172 });
173
174 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
175 register_int_counter_vec(
176 "certificates_signed",
177 "Number of confirmed block certificates signed by each validator",
178 &["validator_name"],
179 )
180 });
181
182 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
183 register_int_counter(
184 "chain_info_queries",
185 "Number of chain info queries processed",
186 )
187 });
188
189 pub static CROSS_CHAIN_BATCH_SIZE: LazyLock<Histogram> = LazyLock::new(|| {
190 register_histogram(
191 "cross_chain_batch_size",
192 "Number of cross-chain requests coalesced into a single per-chain batch",
193 exponential_bucket_interval(1.0, 1000.0),
194 )
195 });
196
197 pub struct MetricsData {
199 certificate_log_str: &'static str,
200 round_type: &'static str,
201 round_number: u32,
202 confirmed_transactions: u64,
203 confirmed_incoming_bundles: u64,
204 confirmed_rejected_bundles: u64,
205 confirmed_incoming_messages: u64,
206 confirmed_operations: u64,
207 validators_with_signatures: Vec<String>,
208 }
209
210 impl MetricsData {
211 pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
213 Self {
214 certificate_log_str: certificate.inner().to_log_str(),
215 round_type: certificate.round.type_name(),
216 round_number: certificate.round.number(),
217 confirmed_transactions: certificate.block().body.transactions.len() as u64,
218 confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
219 as u64,
220 confirmed_rejected_bundles: certificate
221 .block()
222 .body
223 .incoming_bundles()
224 .filter(|b| b.action == MessageAction::Reject)
225 .count() as u64,
226 confirmed_incoming_messages: certificate
227 .block()
228 .body
229 .incoming_bundles()
230 .map(|b| b.messages().count())
231 .sum::<usize>() as u64,
232 confirmed_operations: certificate.block().body.operations().count() as u64,
233 validators_with_signatures: certificate
234 .signatures()
235 .iter()
236 .map(|(validator_name, _)| validator_name.to_string())
237 .collect(),
238 }
239 }
240
241 pub fn record(self) {
243 NUM_BLOCKS.with_label_values(&[]).inc();
244 NUM_ROUNDS_IN_CERTIFICATE
245 .with_label_values(&[self.certificate_log_str, self.round_type])
246 .observe(self.round_number as f64);
247 TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
248 INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
249 OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
250 if self.confirmed_transactions > 0 {
251 TRANSACTION_COUNT
252 .with_label_values(&[])
253 .inc_by(self.confirmed_transactions);
254 if self.confirmed_incoming_bundles > 0 {
255 INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
256 }
257 if self.confirmed_rejected_bundles > 0 {
258 REJECTED_BUNDLE_COUNT.inc_by(self.confirmed_rejected_bundles);
259 }
260 if self.confirmed_incoming_messages > 0 {
261 INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
262 }
263 if self.confirmed_operations > 0 {
264 OPERATION_COUNT.inc_by(self.confirmed_operations);
265 }
266 }
267
268 for validator_name in self.validators_with_signatures {
269 CERTIFICATES_SIGNED
270 .with_label_values(&[&validator_name])
271 .inc();
272 }
273 }
274 }
275}
276
277#[derive(Default, Debug)]
279pub struct NetworkActions {
280 pub cross_chain_requests: Vec<CrossChainRequest>,
282 pub notifications: Vec<Notification>,
284}
285
286impl NetworkActions {
287 pub fn extend(&mut self, other: NetworkActions) {
288 self.cross_chain_requests.extend(other.cross_chain_requests);
289 self.notifications.extend(other.notifications);
290 }
291}
292
293#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
294pub struct Notification {
296 pub chain_id: ChainId,
297 pub reason: Reason,
298}
299
300doc_scalar!(
301 Notification,
302 "Notify that a chain has a new certified block or a new message"
303);
304
305#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
306pub enum Reason {
308 NewBlock {
309 height: BlockHeight,
310 hash: CryptoHash,
311 },
312 NewEvents {
313 height: BlockHeight,
314 block_hash: CryptoHash,
315 event_streams: BTreeSet<StreamId>,
316 },
317 NewIncomingBundle {
318 origin: ChainId,
319 height: BlockHeight,
320 },
321 NewRound {
322 height: BlockHeight,
323 round: Round,
324 },
325 BlockExecuted {
326 height: BlockHeight,
327 hash: CryptoHash,
328 },
329}
330
331#[derive(Debug, Error, strum::IntoStaticStr)]
333pub enum WorkerError {
334 #[error(transparent)]
335 CryptoError(#[from] CryptoError),
336
337 #[error(transparent)]
338 ArithmeticError(#[from] ArithmeticError),
339
340 #[error(transparent)]
341 ViewError(#[from] ViewError),
342
343 #[error("Certificates referenced from chain state are missing in storage: {0:?}")]
344 ReadCertificatesError(Vec<CryptoHash>),
345
346 #[error(transparent)]
347 ChainError(#[from] Box<ChainError>),
348
349 #[error(transparent)]
350 BcsError(#[from] bcs::Error),
351
352 #[error("Block was not signed by an authorized owner")]
354 InvalidOwner,
355
356 #[error("Operations in the block are not authenticated by the proper owner: {0}")]
357 InvalidSigner(AccountOwner),
358
359 #[error(
361 "Chain is expecting a next block at height {expected_block_height} but the given block \
362 is at height {found_block_height} instead"
363 )]
364 UnexpectedBlockHeight {
365 expected_block_height: BlockHeight,
366 found_block_height: BlockHeight,
367 },
368 #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
369 InvalidEpoch {
370 chain_id: ChainId,
371 chain_epoch: Epoch,
372 epoch: Epoch,
373 },
374
375 #[error("Events not found: {0:?}")]
376 EventsNotFound(Vec<EventId>),
377
378 #[error("Invalid cross-chain request")]
380 InvalidCrossChainRequest,
381 #[error("The block does not contain the hash that we expected for the previous block")]
382 InvalidBlockChaining,
383 #[error(
384 "Block timestamp ({block_timestamp}) is further in the future from local time \
385 ({local_time}) than block time grace period ({block_time_grace_period:?})"
386 )]
387 InvalidTimestamp {
388 block_timestamp: Timestamp,
389 local_time: Timestamp,
390 block_time_grace_period: Duration,
391 },
392 #[error("We don't have the value for the certificate.")]
393 MissingCertificateValue,
394 #[error("The hash certificate doesn't match its value.")]
395 InvalidLiteCertificate,
396 #[error("Fast blocks cannot query oracles")]
397 FastBlockUsingOracles,
398 #[error("Blobs not found: {0:?}")]
399 BlobsNotFound(Vec<BlobId>),
400 #[error("Blocks not found: {0:?}")]
407 BlocksNotFound(Vec<CryptoHash>),
408 #[error("Block hash at height {height} for chain {chain_id} not found")]
409 BlockHashNotFound {
410 height: BlockHeight,
411 chain_id: ChainId,
412 },
413 #[error("Block at height {height} on chain {chain_id} not found in local storage")]
414 LocalBlockNotFound {
415 height: BlockHeight,
416 chain_id: ChainId,
417 },
418 #[error("The block proposal is invalid: {0}")]
419 InvalidBlockProposal(String),
420 #[error("Blob was not required by any pending block")]
421 UnexpectedBlob,
422 #[error("Number of published blobs per block must not exceed {0}")]
423 TooManyPublishedBlobs(u64),
424 #[error("Missing network description")]
425 MissingNetworkDescription,
426 #[error("thread error: {0}")]
427 Thread(#[from] web_thread_pool::Error),
428 #[error("Chain worker was poisoned by a journal resolution failure")]
429 PoisonedWorker,
430 #[error("Cross-chain batch was rolled back due to an error in another request")]
431 BatchRolledBack,
432}
433
434impl WorkerError {
435 pub fn is_local(&self) -> bool {
439 match self {
440 WorkerError::CryptoError(_)
441 | WorkerError::ArithmeticError(_)
442 | WorkerError::InvalidOwner
443 | WorkerError::InvalidSigner(_)
444 | WorkerError::UnexpectedBlockHeight { .. }
445 | WorkerError::InvalidEpoch { .. }
446 | WorkerError::EventsNotFound(_)
447 | WorkerError::InvalidBlockChaining
448 | WorkerError::InvalidTimestamp { .. }
449 | WorkerError::MissingCertificateValue
450 | WorkerError::InvalidLiteCertificate
451 | WorkerError::FastBlockUsingOracles
452 | WorkerError::BlobsNotFound(_)
453 | WorkerError::BlocksNotFound(_)
454 | WorkerError::InvalidBlockProposal(_)
455 | WorkerError::UnexpectedBlob
456 | WorkerError::TooManyPublishedBlobs(_)
457 | WorkerError::ViewError(ViewError::NotFound(_)) => false,
458 WorkerError::BcsError(_)
459 | WorkerError::InvalidCrossChainRequest
460 | WorkerError::ViewError(_)
461 | WorkerError::BlockHashNotFound { .. }
462 | WorkerError::LocalBlockNotFound { .. }
463 | WorkerError::MissingNetworkDescription
464 | WorkerError::Thread(_)
465 | WorkerError::ReadCertificatesError(_)
466 | WorkerError::PoisonedWorker
467 | WorkerError::BatchRolledBack => true,
468 WorkerError::ChainError(chain_error) => chain_error.is_local(),
469 }
470 }
471
472 pub fn error_type(&self) -> String {
478 match self {
479 WorkerError::ChainError(chain_error) => chain_error.error_type(),
480 other => {
481 let variant: &'static str = other.into();
482 format!("WorkerError::{variant}")
483 }
484 }
485 }
486
487 fn must_reload_view(&self) -> bool {
490 matches!(
491 self,
492 WorkerError::PoisonedWorker
493 | WorkerError::ViewError(ViewError::StoreError {
494 must_reload_view: true,
495 ..
496 })
497 )
498 }
499
500 fn indicates_corrupted_chain_state(&self) -> bool {
504 matches!(
505 self,
506 WorkerError::ChainError(chain_error)
507 if matches!(chain_error.as_ref(), ChainError::CorruptedChainState(_))
508 )
509 }
510}
511
512impl From<ChainError> for WorkerError {
513 #[instrument(level = "trace", skip(chain_error))]
514 fn from(chain_error: ChainError) -> Self {
515 match chain_error {
516 ChainError::ExecutionError(execution_error, context) => match *execution_error {
517 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
518 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
519 _ => Self::ChainError(Box::new(ChainError::ExecutionError(
520 execution_error,
521 context,
522 ))),
523 },
524 error => Self::ChainError(Box::new(error)),
525 }
526 }
527}
528
529#[cfg(with_testing)]
530impl WorkerError {
531 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
537 let WorkerError::ChainError(chain_error) = self else {
538 panic!("Expected an `ExecutionError`. Got: {self:#?}");
539 };
540
541 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
542 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
543 };
544
545 assert_eq!(context, expected_context);
546
547 *execution_error
548 }
549}
550
551type ChainWorkerArc<S> = Arc<tokio::sync::RwLock<ChainWorkerState<S>>>;
552type ChainWorkerWeak<S> = std::sync::Weak<tokio::sync::RwLock<ChainWorkerState<S>>>;
553type ChainWorkerFuture<S> = Shared<oneshot::Receiver<ChainWorkerWeak<S>>>;
554
555type ChainWorkerMap<S> = Arc<papaya::HashMap<ChainId, ChainWorkerFuture<S>>>;
563
564pub(crate) enum BatchRequest {
566 Update {
567 origin: ChainId,
568 bundles: Vec<(Epoch, MessageBundle)>,
569 previous_height: Option<BlockHeight>,
570 result_sender: oneshot::Sender<Result<CrossChainUpdateResult, WorkerError>>,
571 },
572 Confirm {
573 recipient: ChainId,
574 latest_height: BlockHeight,
575 result_sender: oneshot::Sender<Result<NetworkActions, WorkerError>>,
576 },
577}
578
579#[cfg(not(web))]
586type BatchFuture = pin::Pin<Box<dyn Future<Output = ()> + Send>>;
587#[cfg(web)]
588type BatchFuture = pin::Pin<Box<dyn Future<Output = ()>>>;
589
590#[derive(Clone)]
591struct ChainBatchRequestProcessor {
592 sender: mpsc::UnboundedSender<BatchRequest>,
594 future: WeakShared<BatchFuture>,
597}
598
599impl ChainBatchRequestProcessor {
600 fn create<StorageClient>(
601 state: ChainWorkerArc<StorageClient>,
602 batch_size_limit: usize,
603 ) -> (ChainBatchRequestProcessor, Shared<BatchFuture>)
604 where
605 StorageClient: Storage + Clone + 'static,
606 {
607 let (sender, mut receiver) = mpsc::unbounded_channel();
608 let future: BatchFuture = Box::pin(async move {
609 while let Some(first) = receiver.recv().await {
610 let mut requests = vec![first];
611 match handle::write_lock(&state).await {
612 Ok(mut guard) => {
613 while requests.len() < batch_size_limit {
614 match receiver.try_recv() {
615 Ok(request) => requests.push(request),
616 Err(_) => break,
617 }
618 }
619 #[cfg(with_metrics)]
620 metrics::CROSS_CHAIN_BATCH_SIZE.observe(requests.len() as f64);
621 guard.process_batch(requests).await
622 }
623 Err(error) => {
624 tracing::error!(%error, "failed to obtain write lock");
625 for request in requests {
626 match request {
627 BatchRequest::Update { result_sender, .. } => {
628 send_result(result_sender, Err(WorkerError::PoisonedWorker));
629 }
630 BatchRequest::Confirm { result_sender, .. } => {
631 send_result(result_sender, Err(WorkerError::PoisonedWorker));
632 }
633 }
634 }
635 }
636 }
637 }
638 });
639 let shared = future.shared();
640 let weak = shared.downgrade().expect("future has not been polled yet");
641 let batch_processor = ChainBatchRequestProcessor {
642 sender,
643 future: weak,
644 };
645 (batch_processor, shared)
646 }
647}
648
649type ChainBatchMap = Arc<papaya::HashMap<ChainId, ChainBatchRequestProcessor>>;
650
651fn start_sweep<S: Storage + Clone + 'static>(
655 chain_workers: &ChainWorkerMap<S>,
656 config: &ChainWorkerConfig,
657) {
658 let interval = match (config.ttl, config.sender_chain_ttl) {
661 (None, None) => return,
662 (Some(d), None) | (None, Some(d)) => d,
663 (Some(a), Some(b)) => a.min(b),
664 };
665 let weak_map = Arc::downgrade(chain_workers);
666 linera_base::Task::spawn(async move {
667 loop {
668 linera_base::time::timer::sleep(interval).await;
669 let Some(map) = weak_map.upgrade() else {
670 break;
671 };
672 map.pin_owned().retain(|_, shared| match shared.peek() {
673 Some(Ok(weak)) => weak.strong_count() > 0,
674 Some(Err(_)) => false, None => true, });
677 }
678 })
679 .forget();
680}
681
682pub struct WorkerState<StorageClient: Storage> {
684 storage: StorageClient,
686 chain_worker_config: ChainWorkerConfig,
688 block_cache: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
689 execution_state_cache:
690 Option<Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>>,
691 pub(crate) chain_modes: Option<Arc<RwLock<ChainModes>>>,
693 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
696 chain_workers: ChainWorkerMap<StorageClient>,
700 chain_batches: ChainBatchMap,
702 outbound_cross_chain_sender: Option<OutboundCrossChainSender>,
708}
709
710pub type OutboundCrossChainSender = Arc<dyn Fn(CrossChainRequest) + Send + Sync>;
713
714impl<StorageClient> Clone for WorkerState<StorageClient>
715where
716 StorageClient: Storage + Clone,
717{
718 fn clone(&self) -> Self {
719 WorkerState {
720 storage: self.storage.clone(),
721 chain_worker_config: self.chain_worker_config.clone(),
722 block_cache: self.block_cache.clone(),
723 execution_state_cache: self.execution_state_cache.clone(),
724 chain_modes: self.chain_modes.clone(),
725 delivery_notifiers: self.delivery_notifiers.clone(),
726 chain_workers: self.chain_workers.clone(),
727 chain_batches: self.chain_batches.clone(),
728 outbound_cross_chain_sender: self.outbound_cross_chain_sender.clone(),
729 }
730 }
731}
732
733pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
734
735impl<StorageClient> WorkerState<StorageClient>
736where
737 StorageClient: Storage,
738{
739 #[cfg(with_testing)]
741 #[instrument(level = "trace", skip(self))]
742 pub fn with_cross_chain_message_chunk_limit(mut self, limit: usize) -> Self {
743 self.chain_worker_config.cross_chain_message_chunk_limit = limit;
744 self
745 }
746
747 #[cfg(with_testing)]
749 pub fn set_cross_chain_message_chunk_limit(&mut self, limit: usize) {
750 self.chain_worker_config.cross_chain_message_chunk_limit = limit;
751 }
752
753 #[cfg(with_testing)]
754 #[instrument(level = "trace", skip(self, value))]
755 pub fn with_allow_revert_confirm(mut self, value: bool) -> Self {
756 self.chain_worker_config.allow_revert_confirm = value;
757 self
758 }
759
760 #[instrument(level = "trace", skip(self))]
761 pub fn nickname(&self) -> &str {
762 &self.chain_worker_config.nickname
763 }
764
765 #[instrument(level = "trace", skip(self))]
767 #[cfg(not(feature = "test"))]
768 pub(crate) fn storage_client(&self) -> &StorageClient {
769 &self.storage
770 }
771
772 #[instrument(level = "trace", skip(self))]
775 #[cfg(feature = "test")]
776 pub fn storage_client(&self) -> &StorageClient {
777 &self.storage
778 }
779
780 #[instrument(level = "trace", skip(self, certificate))]
781 pub(crate) async fn full_certificate(
782 &self,
783 certificate: LiteCertificate<'_>,
784 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
785 let block = self
786 .block_cache
787 .get(&certificate.value.value_hash)
788 .ok_or(WorkerError::MissingCertificateValue)?;
789 let block = CacheArc::unwrap_or_clone(block);
790
791 match certificate.value.kind {
792 linera_chain::types::CertificateKind::Confirmed => Ok(Either::Left(
793 certificate
794 .with_value(block)
795 .ok_or(WorkerError::InvalidLiteCertificate)?,
796 )),
797 linera_chain::types::CertificateKind::Validated => {
798 let value = ValidatedBlock::from_hashed(block.into_inner());
799 Ok(Either::Right(
800 certificate
801 .with_value(value)
802 .ok_or(WorkerError::InvalidLiteCertificate)?,
803 ))
804 }
805 _ => Err(WorkerError::InvalidLiteCertificate),
806 }
807 }
808}
809
810#[allow(async_fn_in_trait)]
811#[cfg_attr(not(web), trait_variant::make(Send))]
812pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
813 async fn process_certificate<S: Storage + Clone + 'static>(
814 worker: &WorkerState<S>,
815 certificate: GenericCertificate<Self>,
816 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
817}
818
819impl ProcessableCertificate for ConfirmedBlock {
820 async fn process_certificate<S: Storage + Clone + 'static>(
821 worker: &WorkerState<S>,
822 certificate: ConfirmedBlockCertificate,
823 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
824 Box::pin(worker.handle_confirmed_certificate(
825 certificate,
826 ProcessConfirmedBlockMode::Auto,
827 None,
828 ))
829 .await
830 }
831}
832
833impl ProcessableCertificate for ValidatedBlock {
834 async fn process_certificate<S: Storage + Clone + 'static>(
835 worker: &WorkerState<S>,
836 certificate: ValidatedBlockCertificate,
837 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
838 Box::pin(worker.handle_validated_certificate(certificate)).await
839 }
840}
841
842impl ProcessableCertificate for Timeout {
843 async fn process_certificate<S: Storage + Clone + 'static>(
844 worker: &WorkerState<S>,
845 certificate: TimeoutCertificate,
846 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
847 worker.handle_timeout_certificate(certificate).await
848 }
849}
850
851impl<StorageClient> WorkerState<StorageClient>
852where
853 StorageClient: Storage + Clone + 'static,
854{
855 #[instrument(level = "trace", skip(storage, chain_worker_config))]
860 pub fn new(
861 storage: StorageClient,
862 chain_worker_config: ChainWorkerConfig,
863 chain_modes: Option<Arc<RwLock<ChainModes>>>,
864 ) -> Self {
865 let chain_workers = Arc::new(papaya::HashMap::new());
866 start_sweep(&chain_workers, &chain_worker_config);
867 let block_cache_size = chain_worker_config.block_cache_size;
868 let execution_state_cache_size = chain_worker_config.execution_state_cache_size;
869 WorkerState {
870 storage,
871 chain_worker_config,
872 block_cache: Arc::new(ValueCache::new(
873 "worker_block",
874 block_cache_size,
875 DEFAULT_CLEANUP_INTERVAL_SECS,
876 )),
877 execution_state_cache: (execution_state_cache_size > 0)
878 .then(|| Arc::new(UniqueValueCache::new(execution_state_cache_size))),
879 chain_modes,
880 delivery_notifiers: Arc::default(),
881 chain_workers,
882 #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
886 chain_batches: Arc::new(papaya::HashMap::new()),
887 outbound_cross_chain_sender: None,
888 }
889 }
890
891 pub fn with_outbound_cross_chain_sender(mut self, sender: OutboundCrossChainSender) -> Self {
896 self.outbound_cross_chain_sender = Some(sender);
897 self
898 }
899
900 #[instrument(level = "trace", skip(self, certificate, notifier))]
901 #[inline]
902 pub async fn fully_handle_certificate_with_notifications<T>(
903 &self,
904 certificate: GenericCertificate<T>,
905 notifier: &impl Notifier,
906 ) -> Result<ChainInfoResponse, WorkerError>
907 where
908 T: ProcessableCertificate,
909 {
910 let notifications = (*notifier).clone();
911 let this = self.clone();
912 linera_base::Task::spawn(async move {
913 let (response, actions) =
914 ProcessableCertificate::process_certificate(&this, certificate).await?;
915 notifications.notify(&actions.notifications);
916 let mut requests = VecDeque::from(actions.cross_chain_requests);
917 while let Some(request) = requests.pop_front() {
918 let actions = this.handle_cross_chain_request(request).await?;
919 requests.extend(actions.cross_chain_requests);
920 notifications.notify(&actions.notifications);
921 }
922 Ok(response)
923 })
924 .await
925 }
926
927 #[instrument(level = "trace", skip(self, certificate, notifier))]
931 #[inline]
932 pub async fn fully_handle_confirmed_certificate_with_notifications(
933 &self,
934 certificate: ConfirmedBlockCertificate,
935 mode: ProcessConfirmedBlockMode,
936 notifier: &impl Notifier,
937 ) -> Result<ChainInfoResponse, WorkerError> {
938 let notifications = (*notifier).clone();
939 let this = self.clone();
940 linera_base::Task::spawn(async move {
941 let (response, actions) =
942 Box::pin(this.handle_confirmed_certificate(certificate, mode, None)).await?;
943 notifications.notify(&actions.notifications);
944 let mut requests = VecDeque::from(actions.cross_chain_requests);
945 while let Some(request) = requests.pop_front() {
946 let actions = this.handle_cross_chain_request(request).await?;
947 requests.extend(actions.cross_chain_requests);
948 notifications.notify(&actions.notifications);
949 }
950 Ok(response)
951 })
952 .await
953 }
954
955 async fn chain_read<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
960 where
961 F: FnOnce(OwnedRwLockReadGuard<ChainWorkerState<StorageClient>>) -> Fut,
962 Fut: std::future::Future<Output = Result<R, WorkerError>>,
963 {
964 let state = self.get_or_create_chain_worker(chain_id).await?;
965 let state_ref = &state;
966 let result = Box::pin(wrap_future(async move {
967 let guard = handle::read_lock(state_ref).await?;
968 f(guard).await
969 }))
970 .await;
971 if let Err(error) = &result {
972 if error.must_reload_view() {
973 self.evict_poisoned_worker(chain_id, &state);
974 }
975 }
976 result
977 }
978
979 async fn chain_write<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
988 where
989 F: FnOnce(handle::RollbackGuard<StorageClient>) -> Fut
990 + linera_base::task::MaybeSend
991 + 'static,
992 Fut: std::future::Future<Output = Result<R, WorkerError>> + linera_base::task::MaybeSend,
993 R: linera_base::task::MaybeSend + 'static,
994 {
995 let state = self.get_or_create_chain_worker(chain_id).await?;
996 let state_for_task = state.clone();
997 let result = Box::pin(wrap_future(linera_base::task::run_detached(async move {
998 let guard = handle::write_lock(&state_for_task).await?;
999 f(guard).await
1000 })))
1001 .await;
1002 if let Err(error) = &result {
1003 if error.must_reload_view() {
1004 self.evict_poisoned_worker(chain_id, &state);
1005 } else if error.indicates_corrupted_chain_state() {
1006 self.spawn_reset_corrupted_chain_state(chain_id, state);
1007 }
1008 }
1009 result
1010 }
1011
1012 fn spawn_reset_corrupted_chain_state(
1023 &self,
1024 chain_id: ChainId,
1025 state: ChainWorkerArc<StorageClient>,
1026 ) where
1027 StorageClient: Clone,
1028 {
1029 let this = self.clone();
1030 linera_base::Task::spawn(async move {
1031 let requests = {
1032 let mut guard = match handle::write_lock(&state).await {
1033 Ok(guard) => guard,
1034 Err(error) => {
1035 tracing::error!(
1036 %chain_id, %error,
1037 "Failed to acquire write lock to reset corrupted chain state"
1038 );
1039 return;
1040 }
1041 };
1042 match guard.maybe_reset_corrupted_chain_state().await {
1043 Ok(Some(requests)) => requests,
1044 Ok(None) => return,
1045 Err(error) => {
1046 tracing::error!(
1047 %chain_id, %error, "Failed to reset corrupted chain state"
1048 );
1049 return;
1050 }
1051 }
1052 };
1053 if let Some(sender) = &this.outbound_cross_chain_sender {
1054 for request in requests {
1057 sender(request);
1058 }
1059 } else {
1060 let mut queue = VecDeque::from(requests);
1064 while let Some(request) = queue.pop_front() {
1065 match this.handle_cross_chain_request(request).await {
1066 Ok(actions) => queue.extend(actions.cross_chain_requests),
1067 Err(error) => {
1068 warn!(
1069 %chain_id, %error,
1070 "Failed to dispatch cross-chain request after \
1071 resetting corrupted chain state"
1072 );
1073 }
1074 }
1075 }
1076 }
1077 })
1078 .forget();
1079 }
1080
1081 fn evict_poisoned_worker(&self, chain_id: ChainId, poisoned: &ChainWorkerArc<StorageClient>) {
1085 tracing::warn!(%chain_id, "Evicting poisoned chain worker from cache");
1086 let pin = self.chain_workers.pin();
1087 let weak_poisoned = Arc::downgrade(poisoned);
1088 let removed = pin.remove_if(&chain_id, |_key, future| {
1089 future
1090 .peek()
1091 .and_then(|r| r.clone().ok())
1092 .is_some_and(|weak| weak.ptr_eq(&weak_poisoned))
1093 });
1094 if removed.is_err() {
1095 tracing::trace!(%chain_id, "Poisoned worker entry already replaced; skipping eviction");
1096 }
1097 }
1098
1099 async fn get_or_create_chain_batch(
1101 &self,
1102 chain_id: ChainId,
1103 ) -> Result<(mpsc::UnboundedSender<BatchRequest>, Shared<BatchFuture>), WorkerError> {
1104 if let Some(batch_processor) = self.chain_batches.pin().get(&chain_id) {
1107 if let Some(future) = batch_processor.future.upgrade() {
1108 return Ok((batch_processor.sender.clone(), future));
1109 }
1110 }
1111 let state = self.get_or_create_chain_worker(chain_id).await?;
1112 let (new_request_processor, new_future) = ChainBatchRequestProcessor::create(
1113 state,
1114 self.chain_worker_config.cross_chain_batch_size_limit,
1115 );
1116 match self
1117 .chain_batches
1118 .pin()
1119 .compute(chain_id, |existing| match existing {
1120 Some((_, batch_processor)) => {
1121 if let Some(future) = batch_processor.future.upgrade() {
1122 papaya::Operation::Abort((batch_processor.sender.clone(), future))
1123 } else {
1124 papaya::Operation::Insert(new_request_processor.clone())
1125 }
1126 }
1127 None => papaya::Operation::Insert(new_request_processor.clone()),
1128 }) {
1129 papaya::Compute::Aborted((sender, future)) => Ok((sender, future)),
1130 papaya::Compute::Inserted(_, batch_processor)
1131 | papaya::Compute::Updated {
1132 new: (_, batch_processor),
1133 ..
1134 } => Ok((batch_processor.sender.clone(), new_future)),
1135 papaya::Compute::Removed { .. } => unreachable!(),
1136 }
1137 }
1138
1139 fn get_or_create_chain_worker(
1150 &self,
1151 chain_id: ChainId,
1152 ) -> std::pin::Pin<
1153 Box<
1154 impl std::future::Future<Output = Result<ChainWorkerArc<StorageClient>, WorkerError>> + '_,
1155 >,
1156 > {
1157 Box::pin(wrap_future(async move {
1158 loop {
1159 let (sender, receiver) = oneshot::channel();
1162 let shared_receiver = receiver.shared();
1163
1164 let wait_or_sender = {
1167 let pin = self.chain_workers.pin();
1168 match pin.compute(chain_id, |existing| match existing {
1169 Some((_, entry)) => match entry.peek() {
1170 Some(Ok(weak)) => match weak.upgrade() {
1171 Some(arc) => papaya::Operation::Abort(Ok(arc)),
1172 None => papaya::Operation::Insert(shared_receiver.clone()),
1173 },
1174 Some(Err(_)) => papaya::Operation::Insert(shared_receiver.clone()),
1175 None => papaya::Operation::Abort(Err(entry.clone())),
1176 },
1177 None => papaya::Operation::Insert(shared_receiver.clone()),
1178 }) {
1179 papaya::Compute::Aborted(Ok(arc), ..) => return Ok(arc),
1180 papaya::Compute::Aborted(Err(wait), ..) => Either::Left(wait),
1181 papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => {
1182 Either::Right(sender)
1183 }
1184 papaya::Compute::Removed { .. } => unreachable!(),
1185 }
1186 };
1187
1188 match wait_or_sender {
1189 Either::Left(wait) => {
1190 if let Ok(weak) = wait.await {
1192 if let Some(arc) = weak.upgrade() {
1193 return Ok(arc);
1194 }
1195 }
1196 }
1198 Either::Right(sender) => {
1199 let worker = self.load_chain_worker(chain_id).await?;
1203 if sender.send(Arc::downgrade(&worker)).is_err() {
1204 tracing::error!(%chain_id, "Receiver dropped while loading worker state.");
1205 continue;
1206 }
1207 return Ok(worker);
1208 }
1209 }
1210 }
1211 }))
1212 }
1213
1214 async fn load_chain_worker(
1216 &self,
1217 chain_id: ChainId,
1218 ) -> Result<ChainWorkerArc<StorageClient>, WorkerError> {
1219 let delivery_notifier = self
1220 .delivery_notifiers
1221 .lock()
1222 .unwrap()
1223 .entry(chain_id)
1224 .or_default()
1225 .clone();
1226
1227 let is_tracked = self.chain_modes.as_ref().is_none_or(|chain_modes| {
1231 chain_modes
1232 .read()
1233 .unwrap()
1234 .get(&chain_id)
1235 .is_some_and(ListeningMode::is_full)
1236 });
1237
1238 let (service_runtime_endpoint, service_runtime_task) =
1239 if self.chain_worker_config.long_lived_services {
1240 let actor =
1241 handle::ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await;
1242 (Some(actor.endpoint), Some(actor.task))
1243 } else {
1244 (None, None)
1245 };
1246
1247 let state = crate::chain_worker::state::ChainWorkerState::load(
1248 self.chain_worker_config.clone(),
1249 self.storage.clone(),
1250 self.block_cache.clone(),
1251 self.execution_state_cache.clone(),
1252 self.chain_modes.clone(),
1253 delivery_notifier,
1254 chain_id,
1255 service_runtime_endpoint,
1256 service_runtime_task,
1257 )
1258 .await?;
1259
1260 Ok(handle::create_chain_worker(
1261 state,
1262 is_tracked,
1263 &self.chain_worker_config,
1264 ))
1265 }
1266
1267 #[instrument(level = "trace", skip(self, block))]
1272 pub async fn stage_block_execution(
1273 &self,
1274 block: ProposedBlock,
1275 round: Option<u32>,
1276 published_blobs: Vec<Blob>,
1277 policy: BundleExecutionPolicy,
1278 ) -> Result<
1279 (
1280 ProposedBlock,
1281 Block,
1282 ChainInfoResponse,
1283 ResourceTracker,
1284 HashSet<ChainId>,
1285 ),
1286 WorkerError,
1287 > {
1288 let chain_id = block.chain_id;
1289 self.chain_write(chain_id, move |mut guard| async move {
1290 guard
1291 .stage_block_execution(block, round, &published_blobs, policy)
1292 .await
1293 })
1294 .await
1295 }
1296
1297 #[instrument(level = "trace", skip(self, chain_id, query))]
1302 pub async fn query_application(
1303 &self,
1304 chain_id: ChainId,
1305 query: Query,
1306 block_hash: Option<CryptoHash>,
1307 ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
1308 self.chain_write(chain_id, move |mut guard| async move {
1309 guard.query_application(query, block_hash).await
1310 })
1311 .await
1312 }
1313
1314 #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
1315 nickname = %self.nickname(),
1316 chain_id = %chain_id,
1317 application_id = %application_id
1318 ))]
1319 pub async fn describe_application(
1320 &self,
1321 chain_id: ChainId,
1322 application_id: ApplicationId,
1323 ) -> Result<ApplicationDescription, WorkerError> {
1324 let state = self.get_or_create_chain_worker(chain_id).await?;
1325 let guard = handle::read_lock_initialized(&state).await?;
1326 guard.describe_application_readonly(application_id).await
1327 }
1328
1329 #[instrument(
1331 level = "trace",
1332 skip(self, certificate, notify_when_messages_are_delivered),
1333 fields(
1334 nickname = %self.nickname(),
1335 chain_id = %certificate.block().header.chain_id,
1336 block_height = %certificate.block().header.height
1337 )
1338 )]
1339 async fn process_confirmed_block(
1340 &self,
1341 certificate: ConfirmedBlockCertificate,
1342 mode: ProcessConfirmedBlockMode,
1343 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1344 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1345 let chain_id = certificate.block().header.chain_id;
1346 self.chain_write(chain_id, move |mut guard| async move {
1347 guard
1348 .process_confirmed_block(certificate, mode, notify_when_messages_are_delivered)
1349 .await
1350 })
1351 .await
1352 }
1353
1354 #[instrument(level = "trace", skip(self, certificate), fields(
1356 nickname = %self.nickname(),
1357 chain_id = %certificate.block().header.chain_id,
1358 block_height = %certificate.block().header.height
1359 ))]
1360 async fn process_validated_block(
1361 &self,
1362 certificate: ValidatedBlockCertificate,
1363 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1364 let chain_id = certificate.block().header.chain_id;
1365 self.chain_write(chain_id, move |mut guard| async move {
1366 guard.process_validated_block(certificate).await
1367 })
1368 .await
1369 }
1370
1371 #[instrument(level = "trace", skip(self, certificate), fields(
1373 nickname = %self.nickname(),
1374 chain_id = %certificate.value().chain_id(),
1375 height = %certificate.value().height()
1376 ))]
1377 async fn process_timeout(
1378 &self,
1379 certificate: TimeoutCertificate,
1380 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1381 let chain_id = certificate.value().chain_id();
1382 self.chain_write(chain_id, move |mut guard| async move {
1383 guard.process_timeout(certificate).await
1384 })
1385 .await
1386 }
1387
1388 #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
1391 nickname = %self.nickname(),
1392 origin = %origin,
1393 recipient = %recipient,
1394 num_bundles = %bundles.len()
1395 ))]
1396 async fn process_cross_chain_update(
1397 &self,
1398 origin: ChainId,
1399 recipient: ChainId,
1400 bundles: Vec<(Epoch, MessageBundle)>,
1401 previous_height: Option<BlockHeight>,
1402 ) -> Result<CrossChainUpdateResult, WorkerError> {
1403 let (result_sender, receiver) = oneshot::channel();
1404 let request = BatchRequest::Update {
1405 origin,
1406 bundles,
1407 previous_height,
1408 result_sender,
1409 };
1410 self.enqueue_and_drive(recipient, request, receiver).await
1411 }
1412
1413 async fn confirm_updated_recipient(
1416 &self,
1417 sender: ChainId,
1418 recipient: ChainId,
1419 latest_height: BlockHeight,
1420 ) -> Result<NetworkActions, WorkerError> {
1421 let (result_sender, receiver) = oneshot::channel();
1422 let request = BatchRequest::Confirm {
1423 recipient,
1424 latest_height,
1425 result_sender,
1426 };
1427 self.enqueue_and_drive(sender, request, receiver).await
1428 }
1429
1430 async fn enqueue_and_drive<R>(
1433 &self,
1434 chain_id: ChainId,
1435 request: BatchRequest,
1436 mut receiver: oneshot::Receiver<Result<R, WorkerError>>,
1437 ) -> Result<R, WorkerError> {
1438 let mut pending = Some(request);
1439 loop {
1440 let (sender, future) = self.get_or_create_chain_batch(chain_id).await?;
1441 if let Some(request) = pending.take() {
1442 if let Err(mpsc::error::SendError(request)) = sender.send(request) {
1443 pending = Some(request);
1444 continue; }
1446 }
1447 match future::select(pin::pin!(&mut receiver), future).await {
1450 Either::Left((result, _)) => {
1451 return result.expect("batch result sender dropped");
1452 }
1453 Either::Right(((), _)) => match receiver.try_recv() {
1454 Ok(result) => return result,
1455 Err(oneshot::error::TryRecvError::Empty) => {}
1456 Err(oneshot::error::TryRecvError::Closed) => {
1457 return Err(WorkerError::ChainError(Box::new(
1458 ChainError::InternalError("batch driver stopped".into()),
1459 )));
1460 }
1461 },
1462 }
1463 }
1464 }
1465
1466 #[instrument(level = "trace", skip(self, chain_id, height), fields(
1468 nickname = %self.nickname(),
1469 chain_id = %chain_id,
1470 height = %height
1471 ))]
1472 #[cfg(with_testing)]
1473 pub async fn read_certificate(
1474 &self,
1475 chain_id: ChainId,
1476 height: BlockHeight,
1477 ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, WorkerError> {
1478 let state = self.get_or_create_chain_worker(chain_id).await?;
1479 let guard = handle::read_lock_initialized(&state).await?;
1480 guard.read_certificate(height).await
1481 }
1482
1483 #[cfg(with_testing)]
1486 pub async fn select_message_bundles(
1487 &self,
1488 recipient: ChainId,
1489 origin: &ChainId,
1490 next_height_to_receive: BlockHeight,
1491 last_anticipated_block_height: Option<BlockHeight>,
1492 bundles: Vec<(Epoch, MessageBundle)>,
1493 ) -> Result<Vec<MessageBundle>, WorkerError> {
1494 let state = self.get_or_create_chain_worker(recipient).await?;
1495 let guard = handle::read_lock(&state).await?;
1496 guard
1497 .select_message_bundles(
1498 origin,
1499 next_height_to_receive,
1500 last_anticipated_block_height,
1501 bundles,
1502 )
1503 .await
1504 }
1505
1506 #[instrument(level = "trace", skip(self), fields(
1512 nickname = %self.nickname(),
1513 chain_id = %chain_id
1514 ))]
1515 pub async fn chain_state_view(
1516 &self,
1517 chain_id: ChainId,
1518 ) -> Result<ChainStateViewReadGuard<StorageClient>, WorkerError> {
1519 let state = self.get_or_create_chain_worker(chain_id).await?;
1520 let guard = handle::read_lock(&state).await?;
1521 Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map(
1522 guard,
1523 |s| s.chain(),
1524 )))
1525 }
1526
1527 #[instrument(skip_all, fields(
1528 nick = self.nickname(),
1529 chain_id = format!("{:.8}", proposal.content.block.chain_id),
1530 height = %proposal.content.block.height,
1531 ))]
1532 pub async fn handle_block_proposal(
1533 &self,
1534 proposal: BlockProposal,
1535 ) -> (Result<ChainInfoResponse, WorkerError>, NetworkActions) {
1536 trace!("{} <-- {:?}", self.nickname(), proposal);
1537 #[cfg(with_metrics)]
1538 let round = proposal.content.round;
1539
1540 let chain_id = proposal.content.block.chain_id;
1541 let now = self.storage.clock().current_time();
1543 let block_timestamp = proposal.content.block.timestamp;
1544 let delta = block_timestamp.delta_since(now);
1545 let grace_period = TimeDelta::from_micros(
1546 u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros())
1547 .unwrap_or(u64::MAX),
1548 );
1549 if delta > TimeDelta::ZERO && delta <= grace_period {
1550 self.storage.clock().sleep_until(block_timestamp).await;
1551 }
1552
1553 let outcome = self
1554 .chain_write(chain_id, move |mut guard| async move {
1555 Ok::<_, WorkerError>(guard.handle_block_proposal(proposal).await)
1556 })
1557 .await;
1558 let (result, actions) = match outcome {
1559 Ok((result, actions)) => (result, actions),
1560 Err(err) => (Err(err), NetworkActions::default()),
1561 };
1562 #[cfg(with_metrics)]
1563 if result.is_ok() {
1564 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1565 .with_label_values(&[round.type_name()])
1566 .observe(round.number() as f64);
1567 }
1568 (result, actions)
1569 }
1570
1571 #[instrument(skip_all, fields(
1574 chain_id = %certificate.value.chain_id,
1575 hash = %certificate.value.value_hash,
1576 ))]
1577 pub async fn handle_lite_certificate(
1578 &self,
1579 certificate: LiteCertificate<'_>,
1580 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1581 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1582 match self.full_certificate(certificate).await? {
1583 Either::Left(confirmed) => {
1584 Box::pin(self.handle_confirmed_certificate(
1585 confirmed,
1586 ProcessConfirmedBlockMode::Auto,
1587 notify_when_messages_are_delivered,
1588 ))
1589 .await
1590 }
1591 Either::Right(validated) => {
1592 if let Some(notifier) = notify_when_messages_are_delivered {
1593 if let Err(()) = notifier.send(()) {
1595 debug!("Failed to notify message delivery to caller (validation cert)");
1596 }
1597 }
1598 Box::pin(self.handle_validated_certificate(validated)).await
1599 }
1600 }
1601 }
1602
1603 #[instrument(skip_all, fields(
1605 nick = self.nickname(),
1606 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1607 height = %certificate.block().header.height,
1608 ))]
1609 pub async fn handle_confirmed_certificate(
1610 &self,
1611 certificate: ConfirmedBlockCertificate,
1612 mode: ProcessConfirmedBlockMode,
1613 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1614 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1615 trace!("{} <-- {:?}", self.nickname(), certificate);
1616 #[cfg(with_metrics)]
1617 let metrics_data = metrics::MetricsData::new(&certificate);
1618
1619 #[allow(unused_variables)]
1620 let (info, actions, outcome) = Box::pin(self.process_confirmed_block(
1621 certificate,
1622 mode,
1623 notify_when_messages_are_delivered,
1624 ))
1625 .await?;
1626
1627 #[cfg(with_metrics)]
1628 if matches!(outcome, BlockOutcome::Processed) {
1629 metrics_data.record();
1630 }
1631 Ok((info, actions))
1632 }
1633
1634 #[instrument(skip_all, fields(
1636 nick = self.nickname(),
1637 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1638 height = %certificate.block().header.height,
1639 ))]
1640 pub async fn handle_validated_certificate(
1641 &self,
1642 certificate: ValidatedBlockCertificate,
1643 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1644 trace!("{} <-- {:?}", self.nickname(), certificate);
1645
1646 #[cfg(with_metrics)]
1647 let round = certificate.round;
1648 #[cfg(with_metrics)]
1649 let cert_str = certificate.inner().to_log_str();
1650
1651 #[allow(unused_variables)]
1652 let (info, actions, outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1653 #[cfg(with_metrics)]
1654 {
1655 if matches!(outcome, BlockOutcome::Processed) {
1656 metrics::NUM_ROUNDS_IN_CERTIFICATE
1657 .with_label_values(&[cert_str, round.type_name()])
1658 .observe(round.number() as f64);
1659 }
1660 }
1661 Ok((info, actions))
1662 }
1663
1664 #[instrument(skip_all, fields(
1666 nick = self.nickname(),
1667 chain_id = format!("{:.8}", certificate.inner().chain_id()),
1668 height = %certificate.inner().height(),
1669 ))]
1670 pub async fn handle_timeout_certificate(
1671 &self,
1672 certificate: TimeoutCertificate,
1673 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1674 trace!("{} <-- {:?}", self.nickname(), certificate);
1675 self.process_timeout(certificate).await
1676 }
1677
1678 #[instrument(skip_all, fields(
1679 nick = self.nickname(),
1680 chain_id = format!("{:.8}", query.chain_id)
1681 ))]
1682 pub async fn handle_chain_info_query(
1683 &self,
1684 query: ChainInfoQuery,
1685 ) -> Result<ChainInfoResponse, WorkerError> {
1686 trace!("{} <-- {:?}", self.nickname(), query);
1687 #[cfg(with_metrics)]
1688 metrics::CHAIN_INFO_QUERIES.inc();
1689 let chain_id = query.chain_id;
1690 let result = self
1691 .chain_write(chain_id, move |mut guard| async move {
1692 guard.handle_chain_info_query(query).await
1693 })
1694 .await;
1695 trace!("{} --> {:?}", self.nickname(), result);
1696 result
1697 }
1698
1699 #[instrument(skip_all, fields(
1700 nick = self.nickname(),
1701 chain_id = format!("{:.8}", chain_id)
1702 ))]
1703 pub async fn download_pending_blob(
1704 &self,
1705 chain_id: ChainId,
1706 blob_id: BlobId,
1707 ) -> Result<CacheArc<Blob>, WorkerError> {
1708 trace!("{} <-- download_pending_blob({blob_id:8})", self.nickname());
1709 let result = self
1710 .chain_read(chain_id, |guard| async move {
1711 guard.download_pending_blob(blob_id).await
1712 })
1713 .await;
1714 trace!(
1715 "{} --> {:?}",
1716 self.nickname(),
1717 result.as_ref().map(|_| blob_id)
1718 );
1719 result
1720 }
1721
1722 #[instrument(skip_all, fields(
1723 nick = self.nickname(),
1724 chain_id = format!("{:.8}", chain_id)
1725 ))]
1726 pub async fn handle_pending_blob(
1727 &self,
1728 chain_id: ChainId,
1729 blob: Blob,
1730 ) -> Result<ChainInfoResponse, WorkerError> {
1731 let blob_id = blob.id();
1732 trace!("{} <-- handle_pending_blob({blob_id:8})", self.nickname());
1733 let result = self
1734 .chain_write(chain_id, move |mut guard| async move {
1735 guard.handle_pending_blob(blob).await
1736 })
1737 .await;
1738 trace!(
1739 "{} --> {:?}",
1740 self.nickname(),
1741 result.as_ref().map(|_| blob_id)
1742 );
1743 result
1744 }
1745
1746 #[instrument(skip_all, fields(
1747 nick = self.nickname(),
1748 chain_id = format!("{:.8}", request.target_chain_id())
1749 ))]
1750 pub async fn handle_cross_chain_request(
1751 &self,
1752 request: CrossChainRequest,
1753 ) -> Result<NetworkActions, WorkerError> {
1754 trace!("{} <-- {:?}", self.nickname(), request);
1755 match request {
1756 CrossChainRequest::UpdateRecipient {
1757 sender,
1758 recipient,
1759 bundles,
1760 previous_height,
1761 } => {
1762 let mut actions = NetworkActions::default();
1763 let origin = sender;
1764 match self
1765 .process_cross_chain_update(origin, recipient, bundles, previous_height)
1766 .await?
1767 {
1768 CrossChainUpdateResult::NothingToDo => {}
1769 CrossChainUpdateResult::Updated(height) => {
1770 actions.notifications.push(Notification {
1771 chain_id: recipient,
1772 reason: Reason::NewIncomingBundle { origin, height },
1773 });
1774 actions.cross_chain_requests.push(
1775 CrossChainRequest::ConfirmUpdatedRecipient {
1776 sender,
1777 recipient,
1778 latest_height: height,
1779 },
1780 );
1781 }
1782 CrossChainUpdateResult::GapDetected {
1783 origin,
1784 retransmit_from,
1785 } => {
1786 actions
1787 .cross_chain_requests
1788 .push(CrossChainRequest::RevertConfirm {
1789 sender: origin,
1790 recipient,
1791 retransmit_from,
1792 });
1793 }
1794 }
1795 Ok(actions)
1796 }
1797 CrossChainRequest::ConfirmUpdatedRecipient {
1798 sender,
1799 recipient,
1800 latest_height,
1801 } => {
1802 let actions = self
1803 .confirm_updated_recipient(sender, recipient, latest_height)
1804 .await?;
1805 Ok(actions)
1806 }
1807 CrossChainRequest::RevertConfirm {
1808 sender,
1809 recipient,
1810 retransmit_from,
1811 } => {
1812 self.chain_write(sender, move |mut guard| async move {
1813 guard
1814 .handle_revert_confirm(recipient, retransmit_from)
1815 .await
1816 })
1817 .await
1818 }
1819 }
1820 }
1821
1822 #[instrument(skip_all, fields(
1824 nickname = %self.nickname(),
1825 chain_id = %chain_id,
1826 num_trackers = %new_trackers.len()
1827 ))]
1828 pub async fn update_received_certificate_trackers(
1829 &self,
1830 chain_id: ChainId,
1831 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1832 ) -> Result<(), WorkerError> {
1833 self.chain_write(chain_id, move |mut guard| async move {
1834 guard
1835 .update_received_certificate_trackers(new_trackers)
1836 .await
1837 })
1838 .await
1839 }
1840
1841 #[instrument(skip_all, fields(
1843 nickname = %self.nickname(),
1844 chain_id = %chain_id,
1845 start = %start,
1846 end = %end
1847 ))]
1848 pub async fn get_preprocessed_block_hashes(
1849 &self,
1850 chain_id: ChainId,
1851 start: BlockHeight,
1852 end: BlockHeight,
1853 ) -> Result<Vec<CryptoHash>, WorkerError> {
1854 self.chain_read(chain_id, |guard| async move {
1855 guard.get_preprocessed_block_hashes(start, end).await
1856 })
1857 .await
1858 }
1859
1860 #[instrument(skip_all, fields(
1862 nickname = %self.nickname(),
1863 chain_id = %chain_id,
1864 origin = %origin
1865 ))]
1866 pub async fn get_inbox_next_height(
1867 &self,
1868 chain_id: ChainId,
1869 origin: ChainId,
1870 ) -> Result<BlockHeight, WorkerError> {
1871 self.chain_read(chain_id, |guard| async move {
1872 guard.get_inbox_next_height(origin).await
1873 })
1874 .await
1875 }
1876
1877 #[instrument(skip_all, fields(
1880 nickname = %self.nickname(),
1881 chain_id = %chain_id,
1882 num_blob_ids = %blob_ids.len()
1883 ))]
1884 pub async fn get_locking_blobs(
1885 &self,
1886 chain_id: ChainId,
1887 blob_ids: Vec<BlobId>,
1888 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1889 self.chain_read(chain_id, |guard| async move {
1890 guard.get_locking_blobs(blob_ids).await
1891 })
1892 .await
1893 }
1894
1895 pub async fn get_block_hashes(
1897 &self,
1898 chain_id: ChainId,
1899 heights: Vec<BlockHeight>,
1900 ) -> Result<Vec<CryptoHash>, WorkerError> {
1901 self.chain_read(chain_id, |guard| async move {
1902 guard.get_block_hashes(heights).await
1903 })
1904 .await
1905 }
1906
1907 pub async fn get_proposed_blobs(
1909 &self,
1910 chain_id: ChainId,
1911 blob_ids: Vec<BlobId>,
1912 ) -> Result<Vec<Blob>, WorkerError> {
1913 self.chain_read(chain_id, |guard| async move {
1914 guard.get_proposed_blobs(blob_ids).await
1915 })
1916 .await
1917 }
1918
1919 pub async fn get_event_subscriptions(
1921 &self,
1922 chain_id: ChainId,
1923 ) -> Result<EventSubscriptionsResult, WorkerError> {
1924 self.chain_read(chain_id, |guard| async move {
1925 guard.get_event_subscriptions().await
1926 })
1927 .await
1928 }
1929
1930 pub async fn get_next_expected_event(
1932 &self,
1933 chain_id: ChainId,
1934 stream_id: StreamId,
1935 ) -> Result<Option<u32>, WorkerError> {
1936 self.chain_read(chain_id, |guard| async move {
1937 guard.get_next_expected_event(stream_id).await
1938 })
1939 .await
1940 }
1941
1942 pub async fn next_expected_events(
1944 &self,
1945 chain_id: ChainId,
1946 stream_ids: Vec<StreamId>,
1947 ) -> Result<BTreeMap<StreamId, u32>, WorkerError> {
1948 self.chain_read(chain_id, |guard| async move {
1949 guard.get_next_expected_events(stream_ids).await
1950 })
1951 .await
1952 }
1953
1954 pub async fn get_received_certificate_trackers(
1956 &self,
1957 chain_id: ChainId,
1958 ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1959 self.chain_read(chain_id, |guard| async move {
1960 guard.get_received_certificate_trackers().await
1961 })
1962 .await
1963 }
1964
1965 pub async fn cross_chain_network_actions(
1969 &self,
1970 chain_id: ChainId,
1971 ) -> Result<NetworkActions, WorkerError> {
1972 if let Some(actions) = self
1976 .chain_read(chain_id, |guard| async move {
1977 guard.cross_chain_network_actions_if_reconciled().await
1978 })
1979 .await?
1980 {
1981 return Ok(actions);
1982 }
1983 self.chain_write(chain_id, |mut guard| async move {
1986 guard.reconcile_and_cross_chain_network_actions().await
1987 })
1988 .await
1989 }
1990
1991 pub async fn get_tip_state_and_outbox_info(
1993 &self,
1994 chain_id: ChainId,
1995 receiver_id: ChainId,
1996 ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1997 self.chain_read(chain_id, |guard| async move {
1998 guard.get_tip_state_and_outbox_info(receiver_id).await
1999 })
2000 .await
2001 }
2002
2003 pub async fn get_next_height_to_preprocess(
2005 &self,
2006 chain_id: ChainId,
2007 ) -> Result<BlockHeight, WorkerError> {
2008 self.chain_read(chain_id, |guard| async move {
2009 Ok(guard.get_next_height_to_preprocess())
2010 })
2011 .await
2012 }
2013}
2014
2015#[cfg(with_testing)]
2016impl<StorageClient> WorkerState<StorageClient>
2017where
2018 StorageClient: Storage + Clone + 'static,
2019{
2020 #[instrument(level = "trace", skip(self))]
2026 pub fn public_key(&self) -> ValidatorPublicKey {
2027 self.chain_worker_config
2028 .key_pair()
2029 .expect(
2030 "Test validator should have a key pair assigned to it \
2031 in order to obtain its public key",
2032 )
2033 .public()
2034 }
2035}