1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::{
12 future::{Either, Shared},
13 FutureExt as _,
14};
15use linera_base::{
16 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
17 data_types::{
18 ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta,
19 Timestamp,
20 },
21 doc_scalar,
22 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
23};
24use linera_cache::{UniqueValueCache, ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
25#[cfg(with_testing)]
26use linera_chain::ChainExecutionContext;
27use linera_chain::{
28 data_types::{BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock},
29 types::{
30 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
31 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
32 },
33 ChainError, ChainStateView,
34};
35use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
36use linera_storage::{Clock as _, Storage};
37use linera_views::{context::InactiveContext, ViewError};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40use tokio::sync::{oneshot, OwnedRwLockReadGuard};
41use tracing::{instrument, trace, warn};
42
43pub struct ChainStateViewReadGuard<S: Storage>(
50 OwnedRwLockReadGuard<ChainWorkerState<S>, ChainStateView<S::Context>>,
51);
52
53impl<S: Storage> std::ops::Deref for ChainStateViewReadGuard<S> {
54 type Target = ChainStateView<S::Context>;
55
56 fn deref(&self) -> &Self::Target {
57 &self.0
58 }
59}
60
61pub(crate) use crate::chain_worker::EventSubscriptionsResult;
63use crate::{
64 chain_worker::{
65 handle, state::ChainWorkerState, BlockOutcome, ChainWorkerConfig, CrossChainUpdateResult,
66 DeliveryNotifier,
67 },
68 client::ListeningMode,
69 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
70 notifier::Notifier,
71};
72
73pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5_000;
74pub const DEFAULT_EXECUTION_STATE_CACHE_SIZE: usize = 10_000;
75
76#[cfg(test)]
77#[path = "unit_tests/worker_tests.rs"]
78mod worker_tests;
79
80#[cfg(not(web))]
83pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> sync_wrapper::SyncFuture<F> {
84 sync_wrapper::SyncFuture::new(f)
85}
86
87#[cfg(web)]
90pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> F {
91 f
92}
93
94#[cfg(with_metrics)]
95mod metrics {
96 use std::sync::LazyLock;
97
98 use linera_base::prometheus_util::{
99 exponential_bucket_interval, register_histogram, register_histogram_vec,
100 register_int_counter, register_int_counter_vec,
101 };
102 use linera_chain::{data_types::MessageAction, types::ConfirmedBlockCertificate};
103 use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
104
105 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
106 register_histogram_vec(
107 "num_rounds_in_certificate",
108 "Number of rounds in certificate",
109 &["certificate_value", "round_type"],
110 exponential_bucket_interval(0.1, 50.0),
111 )
112 });
113
114 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
115 register_histogram_vec(
116 "num_rounds_in_block_proposal",
117 "Number of rounds in block proposal",
118 &["round_type"],
119 exponential_bucket_interval(0.1, 50.0),
120 )
121 });
122
123 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
124 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
125
126 pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
127 LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
128
129 pub static REJECTED_BUNDLE_COUNT: LazyLock<IntCounter> =
130 LazyLock::new(|| register_int_counter("rejected_bundle_count", "Rejected bundle count"));
131
132 pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
133 LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
134
135 pub static OPERATION_COUNT: LazyLock<IntCounter> =
136 LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
137
138 pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
139 register_histogram(
140 "operations_per_block",
141 "Number of operations per block",
142 exponential_bucket_interval(1.0, 10000.0),
143 )
144 });
145
146 pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
147 register_histogram(
148 "incoming_bundles_per_block",
149 "Number of incoming bundles per block",
150 exponential_bucket_interval(1.0, 10000.0),
151 )
152 });
153
154 pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
155 register_histogram(
156 "transactions_per_block",
157 "Number of transactions per block",
158 exponential_bucket_interval(1.0, 10000.0),
159 )
160 });
161
162 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
163 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
164 });
165
166 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
167 register_int_counter_vec(
168 "certificates_signed",
169 "Number of confirmed block certificates signed by each validator",
170 &["validator_name"],
171 )
172 });
173
174 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
175 register_int_counter(
176 "chain_info_queries",
177 "Number of chain info queries processed",
178 )
179 });
180
181 pub struct MetricsData {
183 certificate_log_str: &'static str,
184 round_type: &'static str,
185 round_number: u32,
186 confirmed_transactions: u64,
187 confirmed_incoming_bundles: u64,
188 confirmed_rejected_bundles: u64,
189 confirmed_incoming_messages: u64,
190 confirmed_operations: u64,
191 validators_with_signatures: Vec<String>,
192 }
193
194 impl MetricsData {
195 pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
197 Self {
198 certificate_log_str: certificate.inner().to_log_str(),
199 round_type: certificate.round.type_name(),
200 round_number: certificate.round.number(),
201 confirmed_transactions: certificate.block().body.transactions.len() as u64,
202 confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
203 as u64,
204 confirmed_rejected_bundles: certificate
205 .block()
206 .body
207 .incoming_bundles()
208 .filter(|b| b.action == MessageAction::Reject)
209 .count() as u64,
210 confirmed_incoming_messages: certificate
211 .block()
212 .body
213 .incoming_bundles()
214 .map(|b| b.messages().count())
215 .sum::<usize>() as u64,
216 confirmed_operations: certificate.block().body.operations().count() as u64,
217 validators_with_signatures: certificate
218 .signatures()
219 .iter()
220 .map(|(validator_name, _)| validator_name.to_string())
221 .collect(),
222 }
223 }
224
225 pub fn record(self) {
227 NUM_BLOCKS.with_label_values(&[]).inc();
228 NUM_ROUNDS_IN_CERTIFICATE
229 .with_label_values(&[self.certificate_log_str, self.round_type])
230 .observe(self.round_number as f64);
231 TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
232 INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
233 OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
234 if self.confirmed_transactions > 0 {
235 TRANSACTION_COUNT
236 .with_label_values(&[])
237 .inc_by(self.confirmed_transactions);
238 if self.confirmed_incoming_bundles > 0 {
239 INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
240 }
241 if self.confirmed_rejected_bundles > 0 {
242 REJECTED_BUNDLE_COUNT.inc_by(self.confirmed_rejected_bundles);
243 }
244 if self.confirmed_incoming_messages > 0 {
245 INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
246 }
247 if self.confirmed_operations > 0 {
248 OPERATION_COUNT.inc_by(self.confirmed_operations);
249 }
250 }
251
252 for validator_name in self.validators_with_signatures {
253 CERTIFICATES_SIGNED
254 .with_label_values(&[&validator_name])
255 .inc();
256 }
257 }
258 }
259}
260
261#[derive(Default, Debug)]
263pub struct NetworkActions {
264 pub cross_chain_requests: Vec<CrossChainRequest>,
266 pub notifications: Vec<Notification>,
268}
269
270impl NetworkActions {
271 pub fn extend(&mut self, other: NetworkActions) {
272 self.cross_chain_requests.extend(other.cross_chain_requests);
273 self.notifications.extend(other.notifications);
274 }
275}
276
277#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
278pub struct Notification {
280 pub chain_id: ChainId,
281 pub reason: Reason,
282}
283
284doc_scalar!(
285 Notification,
286 "Notify that a chain has a new certified block or a new message"
287);
288
289#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
290pub enum Reason {
292 NewBlock {
293 height: BlockHeight,
294 hash: CryptoHash,
295 },
296 NewEvents {
297 height: BlockHeight,
298 block_hash: CryptoHash,
299 event_streams: BTreeSet<StreamId>,
300 },
301 NewIncomingBundle {
302 origin: ChainId,
303 height: BlockHeight,
304 },
305 NewRound {
306 height: BlockHeight,
307 round: Round,
308 },
309 BlockExecuted {
310 height: BlockHeight,
311 hash: CryptoHash,
312 },
313}
314
315#[derive(Debug, Error, strum::IntoStaticStr)]
317pub enum WorkerError {
318 #[error(transparent)]
319 CryptoError(#[from] CryptoError),
320
321 #[error(transparent)]
322 ArithmeticError(#[from] ArithmeticError),
323
324 #[error(transparent)]
325 ViewError(#[from] ViewError),
326
327 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
328 ReadCertificatesError(Vec<CryptoHash>),
329
330 #[error(transparent)]
331 ChainError(#[from] Box<ChainError>),
332
333 #[error(transparent)]
334 BcsError(#[from] bcs::Error),
335
336 #[error("Block was not signed by an authorized owner")]
338 InvalidOwner,
339
340 #[error("Operations in the block are not authenticated by the proper owner: {0}")]
341 InvalidSigner(AccountOwner),
342
343 #[error(
345 "Chain is expecting a next block at height {expected_block_height} but the given block \
346 is at height {found_block_height} instead"
347 )]
348 UnexpectedBlockHeight {
349 expected_block_height: BlockHeight,
350 found_block_height: BlockHeight,
351 },
352 #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
353 InvalidEpoch {
354 chain_id: ChainId,
355 chain_epoch: Epoch,
356 epoch: Epoch,
357 },
358
359 #[error("Events not found: {0:?}")]
360 EventsNotFound(Vec<EventId>),
361
362 #[error("Invalid cross-chain request")]
364 InvalidCrossChainRequest,
365 #[error("The block does not contain the hash that we expected for the previous block")]
366 InvalidBlockChaining,
367 #[error(
368 "Block timestamp ({block_timestamp}) is further in the future from local time \
369 ({local_time}) than block time grace period ({block_time_grace_period:?})"
370 )]
371 InvalidTimestamp {
372 block_timestamp: Timestamp,
373 local_time: Timestamp,
374 block_time_grace_period: Duration,
375 },
376 #[error("We don't have the value for the certificate.")]
377 MissingCertificateValue,
378 #[error("The hash certificate doesn't match its value.")]
379 InvalidLiteCertificate,
380 #[error("Fast blocks cannot query oracles")]
381 FastBlockUsingOracles,
382 #[error("Blobs not found: {0:?}")]
383 BlobsNotFound(Vec<BlobId>),
384 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
385 ConfirmedLogEntryNotFound {
386 height: BlockHeight,
387 chain_id: ChainId,
388 },
389 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
390 PreprocessedBlocksEntryNotFound {
391 height: BlockHeight,
392 chain_id: ChainId,
393 },
394 #[error(
395 "confirmed_log/preprocessed_blocks entry at height {height} for chain {chain_id} not found"
396 )]
397 ConfirmedBlockHashNotFound {
398 height: BlockHeight,
399 chain_id: ChainId,
400 },
401 #[error("Block at height {height} on chain {chain_id} not found in local storage")]
402 LocalBlockNotFound {
403 height: BlockHeight,
404 chain_id: ChainId,
405 },
406 #[error("The block proposal is invalid: {0}")]
407 InvalidBlockProposal(String),
408 #[error("Blob was not required by any pending block")]
409 UnexpectedBlob,
410 #[error("Number of published blobs per block must not exceed {0}")]
411 TooManyPublishedBlobs(u64),
412 #[error("Missing network description")]
413 MissingNetworkDescription,
414 #[error("thread error: {0}")]
415 Thread(#[from] web_thread_pool::Error),
416 #[error("Chain worker was poisoned by a journal resolution failure")]
417 PoisonedWorker,
418}
419
420impl WorkerError {
421 pub fn is_local(&self) -> bool {
425 match self {
426 WorkerError::CryptoError(_)
427 | WorkerError::ArithmeticError(_)
428 | WorkerError::InvalidOwner
429 | WorkerError::InvalidSigner(_)
430 | WorkerError::UnexpectedBlockHeight { .. }
431 | WorkerError::InvalidEpoch { .. }
432 | WorkerError::EventsNotFound(_)
433 | WorkerError::InvalidBlockChaining
434 | WorkerError::InvalidTimestamp { .. }
435 | WorkerError::MissingCertificateValue
436 | WorkerError::InvalidLiteCertificate
437 | WorkerError::FastBlockUsingOracles
438 | WorkerError::BlobsNotFound(_)
439 | WorkerError::InvalidBlockProposal(_)
440 | WorkerError::UnexpectedBlob
441 | WorkerError::TooManyPublishedBlobs(_)
442 | WorkerError::ViewError(ViewError::NotFound(_)) => false,
443 WorkerError::BcsError(_)
444 | WorkerError::InvalidCrossChainRequest
445 | WorkerError::ViewError(_)
446 | WorkerError::ConfirmedLogEntryNotFound { .. }
447 | WorkerError::PreprocessedBlocksEntryNotFound { .. }
448 | WorkerError::ConfirmedBlockHashNotFound { .. }
449 | WorkerError::LocalBlockNotFound { .. }
450 | WorkerError::MissingNetworkDescription
451 | WorkerError::Thread(_)
452 | WorkerError::ReadCertificatesError(_)
453 | WorkerError::PoisonedWorker => true,
454 WorkerError::ChainError(chain_error) => chain_error.is_local(),
455 }
456 }
457
458 pub fn error_type(&self) -> String {
464 match self {
465 WorkerError::ChainError(chain_error) => chain_error.error_type(),
466 other => {
467 let variant: &'static str = other.into();
468 format!("WorkerError::{variant}")
469 }
470 }
471 }
472
473 fn must_reload_view(&self) -> bool {
476 matches!(
477 self,
478 WorkerError::PoisonedWorker
479 | WorkerError::ViewError(ViewError::StoreError {
480 must_reload_view: true,
481 ..
482 })
483 )
484 }
485
486 fn indicates_corrupted_chain_state(&self) -> bool {
490 matches!(
491 self,
492 WorkerError::ChainError(chain_error)
493 if matches!(chain_error.as_ref(), ChainError::CorruptedChainState(_))
494 )
495 }
496}
497
498impl From<ChainError> for WorkerError {
499 #[instrument(level = "trace", skip(chain_error))]
500 fn from(chain_error: ChainError) -> Self {
501 match chain_error {
502 ChainError::ExecutionError(execution_error, context) => match *execution_error {
503 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
504 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
505 _ => Self::ChainError(Box::new(ChainError::ExecutionError(
506 execution_error,
507 context,
508 ))),
509 },
510 error => Self::ChainError(Box::new(error)),
511 }
512 }
513}
514
515#[cfg(with_testing)]
516impl WorkerError {
517 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
523 let WorkerError::ChainError(chain_error) = self else {
524 panic!("Expected an `ExecutionError`. Got: {self:#?}");
525 };
526
527 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
528 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
529 };
530
531 assert_eq!(context, expected_context);
532
533 *execution_error
534 }
535}
536
537type ChainWorkerArc<S> = Arc<tokio::sync::RwLock<ChainWorkerState<S>>>;
538type ChainWorkerWeak<S> = std::sync::Weak<tokio::sync::RwLock<ChainWorkerState<S>>>;
539type ChainWorkerFuture<S> = Shared<oneshot::Receiver<ChainWorkerWeak<S>>>;
540
541type ChainWorkerMap<S> = Arc<papaya::HashMap<ChainId, ChainWorkerFuture<S>>>;
549
550fn start_sweep<S: Storage + Clone + 'static>(
554 chain_workers: &ChainWorkerMap<S>,
555 config: &ChainWorkerConfig,
556) {
557 let interval = match (config.ttl, config.sender_chain_ttl) {
560 (None, None) => return,
561 (Some(d), None) | (None, Some(d)) => d,
562 (Some(a), Some(b)) => a.min(b),
563 };
564 let weak_map = Arc::downgrade(chain_workers);
565 linera_base::Task::spawn(async move {
566 loop {
567 linera_base::time::timer::sleep(interval).await;
568 let Some(map) = weak_map.upgrade() else {
569 break;
570 };
571 map.pin_owned().retain(|_, shared| match shared.peek() {
572 Some(Ok(weak)) => weak.strong_count() > 0,
573 Some(Err(_)) => false, None => true, });
576 }
577 })
578 .forget();
579}
580
581pub struct WorkerState<StorageClient: Storage> {
583 storage: StorageClient,
585 chain_worker_config: ChainWorkerConfig,
587 block_cache: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
588 execution_state_cache:
589 Option<Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>>,
590 chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
592 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
595 chain_workers: ChainWorkerMap<StorageClient>,
599 outbound_cross_chain_sender: Option<OutboundCrossChainSender>,
605}
606
607pub type OutboundCrossChainSender = Arc<dyn Fn(CrossChainRequest) + Send + Sync>;
610
611impl<StorageClient> Clone for WorkerState<StorageClient>
612where
613 StorageClient: Storage + Clone,
614{
615 fn clone(&self) -> Self {
616 WorkerState {
617 storage: self.storage.clone(),
618 chain_worker_config: self.chain_worker_config.clone(),
619 block_cache: self.block_cache.clone(),
620 execution_state_cache: self.execution_state_cache.clone(),
621 chain_modes: self.chain_modes.clone(),
622 delivery_notifiers: self.delivery_notifiers.clone(),
623 chain_workers: self.chain_workers.clone(),
624 outbound_cross_chain_sender: self.outbound_cross_chain_sender.clone(),
625 }
626 }
627}
628
629pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
630
631impl<StorageClient> WorkerState<StorageClient>
632where
633 StorageClient: Storage,
634{
635 #[cfg(with_testing)]
637 #[instrument(level = "trace", skip(self))]
638 pub fn with_cross_chain_message_chunk_limit(mut self, limit: usize) -> Self {
639 self.chain_worker_config.cross_chain_message_chunk_limit = limit;
640 self
641 }
642
643 #[cfg(with_testing)]
645 pub fn set_cross_chain_message_chunk_limit(&mut self, limit: usize) {
646 self.chain_worker_config.cross_chain_message_chunk_limit = limit;
647 }
648
649 #[cfg(with_testing)]
650 #[instrument(level = "trace", skip(self, value))]
651 pub fn with_allow_revert_confirm(mut self, value: bool) -> Self {
652 self.chain_worker_config.allow_revert_confirm = value;
653 self
654 }
655
656 #[instrument(level = "trace", skip(self))]
657 pub fn nickname(&self) -> &str {
658 &self.chain_worker_config.nickname
659 }
660
661 #[instrument(level = "trace", skip(self))]
663 #[cfg(not(feature = "test"))]
664 pub(crate) fn storage_client(&self) -> &StorageClient {
665 &self.storage
666 }
667
668 #[instrument(level = "trace", skip(self))]
671 #[cfg(feature = "test")]
672 pub fn storage_client(&self) -> &StorageClient {
673 &self.storage
674 }
675
676 #[instrument(level = "trace", skip(self, certificate))]
677 pub(crate) async fn full_certificate(
678 &self,
679 certificate: LiteCertificate<'_>,
680 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
681 let block = self
682 .block_cache
683 .get(&certificate.value.value_hash)
684 .ok_or(WorkerError::MissingCertificateValue)?;
685 let block = Arc::unwrap_or_clone(block);
686
687 match certificate.value.kind {
688 linera_chain::types::CertificateKind::Confirmed => Ok(Either::Left(
689 certificate
690 .with_value(block)
691 .ok_or(WorkerError::InvalidLiteCertificate)?,
692 )),
693 linera_chain::types::CertificateKind::Validated => {
694 let value = ValidatedBlock::from_hashed(block.into_inner());
695 Ok(Either::Right(
696 certificate
697 .with_value(value)
698 .ok_or(WorkerError::InvalidLiteCertificate)?,
699 ))
700 }
701 _ => Err(WorkerError::InvalidLiteCertificate),
702 }
703 }
704}
705
706#[allow(async_fn_in_trait)]
707#[cfg_attr(not(web), trait_variant::make(Send))]
708pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
709 async fn process_certificate<S: Storage + Clone + 'static>(
710 worker: &WorkerState<S>,
711 certificate: GenericCertificate<Self>,
712 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
713}
714
715impl ProcessableCertificate for ConfirmedBlock {
716 async fn process_certificate<S: Storage + Clone + 'static>(
717 worker: &WorkerState<S>,
718 certificate: ConfirmedBlockCertificate,
719 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
720 Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
721 }
722}
723
724impl ProcessableCertificate for ValidatedBlock {
725 async fn process_certificate<S: Storage + Clone + 'static>(
726 worker: &WorkerState<S>,
727 certificate: ValidatedBlockCertificate,
728 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
729 Box::pin(worker.handle_validated_certificate(certificate)).await
730 }
731}
732
733impl ProcessableCertificate for Timeout {
734 async fn process_certificate<S: Storage + Clone + 'static>(
735 worker: &WorkerState<S>,
736 certificate: TimeoutCertificate,
737 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
738 worker.handle_timeout_certificate(certificate).await
739 }
740}
741
742impl<StorageClient> WorkerState<StorageClient>
743where
744 StorageClient: Storage + Clone + 'static,
745{
746 #[instrument(level = "trace", skip(storage, chain_worker_config))]
751 pub fn new(
752 storage: StorageClient,
753 chain_worker_config: ChainWorkerConfig,
754 chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
755 ) -> Self {
756 let chain_workers = Arc::new(papaya::HashMap::new());
757 start_sweep(&chain_workers, &chain_worker_config);
758 let block_cache_size = chain_worker_config.block_cache_size;
759 let execution_state_cache_size = chain_worker_config.execution_state_cache_size;
760 WorkerState {
761 storage,
762 chain_worker_config,
763 block_cache: Arc::new(ValueCache::new(
764 block_cache_size,
765 DEFAULT_CLEANUP_INTERVAL_SECS,
766 )),
767 execution_state_cache: (execution_state_cache_size > 0)
768 .then(|| Arc::new(UniqueValueCache::new(execution_state_cache_size))),
769 chain_modes,
770 delivery_notifiers: Arc::default(),
771 chain_workers,
772 outbound_cross_chain_sender: None,
773 }
774 }
775
776 pub fn with_outbound_cross_chain_sender(mut self, sender: OutboundCrossChainSender) -> Self {
781 self.outbound_cross_chain_sender = Some(sender);
782 self
783 }
784
785 #[instrument(level = "trace", skip(self, certificate, notifier))]
786 #[inline]
787 pub async fn fully_handle_certificate_with_notifications<T>(
788 &self,
789 certificate: GenericCertificate<T>,
790 notifier: &impl Notifier,
791 ) -> Result<ChainInfoResponse, WorkerError>
792 where
793 T: ProcessableCertificate,
794 {
795 let notifications = (*notifier).clone();
796 let this = self.clone();
797 linera_base::Task::spawn(async move {
798 let (response, actions) =
799 ProcessableCertificate::process_certificate(&this, certificate).await?;
800 notifications.notify(&actions.notifications);
801 let mut requests = VecDeque::from(actions.cross_chain_requests);
802 while let Some(request) = requests.pop_front() {
803 let actions = this.handle_cross_chain_request(request).await?;
804 requests.extend(actions.cross_chain_requests);
805 notifications.notify(&actions.notifications);
806 }
807 Ok(response)
808 })
809 .await
810 }
811
812 async fn chain_read<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
817 where
818 F: FnOnce(OwnedRwLockReadGuard<ChainWorkerState<StorageClient>>) -> Fut,
819 Fut: std::future::Future<Output = Result<R, WorkerError>>,
820 {
821 let state = self.get_or_create_chain_worker(chain_id).await?;
822 let state_ref = &state;
823 let result = Box::pin(wrap_future(async move {
824 let guard = handle::read_lock(state_ref).await;
825 guard.check_not_poisoned()?;
826 f(guard).await
827 }))
828 .await;
829 if let Err(error) = &result {
830 if error.must_reload_view() {
831 self.evict_poisoned_worker(chain_id, &state);
832 }
833 }
834 result
835 }
836
837 async fn chain_write<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
846 where
847 F: FnOnce(handle::RollbackGuard<StorageClient>) -> Fut
848 + linera_base::task::MaybeSend
849 + 'static,
850 Fut: std::future::Future<Output = Result<R, WorkerError>> + linera_base::task::MaybeSend,
851 R: linera_base::task::MaybeSend + 'static,
852 {
853 let state = self.get_or_create_chain_worker(chain_id).await?;
854 let state_for_task = state.clone();
855 let result = Box::pin(wrap_future(linera_base::task::run_detached(async move {
856 let guard = handle::write_lock(&state_for_task).await;
857 guard.check_not_poisoned()?;
858 f(guard).await
859 })))
860 .await;
861 if let Err(error) = &result {
862 if error.must_reload_view() {
863 self.evict_poisoned_worker(chain_id, &state);
864 } else if error.indicates_corrupted_chain_state() {
865 self.spawn_reset_corrupted_chain_state(chain_id, state);
866 }
867 }
868 result
869 }
870
871 fn spawn_reset_corrupted_chain_state(
882 &self,
883 chain_id: ChainId,
884 state: ChainWorkerArc<StorageClient>,
885 ) where
886 StorageClient: Clone,
887 {
888 let this = self.clone();
889 linera_base::Task::spawn(async move {
890 let requests = {
891 let mut guard = handle::write_lock(&state).await;
892 match guard.maybe_reset_corrupted_chain_state().await {
893 Ok(Some(requests)) => requests,
894 Ok(None) => return,
895 Err(error) => {
896 tracing::error!(
897 %chain_id, %error, "Failed to reset corrupted chain state"
898 );
899 return;
900 }
901 }
902 };
903 if let Some(sender) = &this.outbound_cross_chain_sender {
904 for request in requests {
907 sender(request);
908 }
909 } else {
910 let mut queue = VecDeque::from(requests);
914 while let Some(request) = queue.pop_front() {
915 match this.handle_cross_chain_request(request).await {
916 Ok(actions) => queue.extend(actions.cross_chain_requests),
917 Err(error) => {
918 warn!(
919 %chain_id, %error,
920 "Failed to dispatch cross-chain request after \
921 resetting corrupted chain state"
922 );
923 }
924 }
925 }
926 }
927 })
928 .forget();
929 }
930
931 fn evict_poisoned_worker(&self, chain_id: ChainId, poisoned: &ChainWorkerArc<StorageClient>) {
935 tracing::warn!(%chain_id, "Evicting poisoned chain worker from cache");
936 let pin = self.chain_workers.pin();
937 let weak_poisoned = Arc::downgrade(poisoned);
938 let removed = pin.remove_if(&chain_id, |_key, future| {
939 future
940 .peek()
941 .and_then(|r| r.clone().ok())
942 .is_some_and(|weak| weak.ptr_eq(&weak_poisoned))
943 });
944 if removed.is_err() {
945 tracing::trace!(%chain_id, "Poisoned worker entry already replaced; skipping eviction");
946 }
947 }
948
949 fn get_or_create_chain_worker(
960 &self,
961 chain_id: ChainId,
962 ) -> std::pin::Pin<
963 Box<
964 impl std::future::Future<Output = Result<ChainWorkerArc<StorageClient>, WorkerError>> + '_,
965 >,
966 > {
967 Box::pin(wrap_future(async move {
968 loop {
969 let (tx, rx) = oneshot::channel();
972 let shared_rx = rx.shared();
973
974 let wait_or_tx = {
977 let pin = self.chain_workers.pin();
978 match pin.compute(chain_id, |existing| match existing {
979 Some((_, entry)) => match entry.peek() {
980 Some(Ok(weak)) => match weak.upgrade() {
981 Some(arc) => papaya::Operation::Abort(Ok(arc)),
982 None => papaya::Operation::Insert(shared_rx.clone()),
983 },
984 Some(Err(_)) => papaya::Operation::Insert(shared_rx.clone()),
985 None => papaya::Operation::Abort(Err(entry.clone())),
986 },
987 None => papaya::Operation::Insert(shared_rx.clone()),
988 }) {
989 papaya::Compute::Aborted(Ok(arc), ..) => return Ok(arc),
990 papaya::Compute::Aborted(Err(wait), ..) => Either::Left(wait),
991 papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => {
992 Either::Right(tx)
993 }
994 papaya::Compute::Removed { .. } => unreachable!(),
995 }
996 };
997
998 match wait_or_tx {
999 Either::Left(wait) => {
1000 if let Ok(weak) = wait.await {
1002 if let Some(arc) = weak.upgrade() {
1003 return Ok(arc);
1004 }
1005 }
1006 }
1008 Either::Right(tx) => {
1009 let worker = self.load_chain_worker(chain_id).await?;
1013 if tx.send(Arc::downgrade(&worker)).is_err() {
1014 tracing::error!(%chain_id, "Receiver dropped while loading worker state.");
1015 continue;
1016 }
1017 return Ok(worker);
1018 }
1019 }
1020 }
1021 }))
1022 }
1023
1024 async fn load_chain_worker(
1026 &self,
1027 chain_id: ChainId,
1028 ) -> Result<ChainWorkerArc<StorageClient>, WorkerError> {
1029 let delivery_notifier = self
1030 .delivery_notifiers
1031 .lock()
1032 .unwrap()
1033 .entry(chain_id)
1034 .or_default()
1035 .clone();
1036
1037 let is_tracked = self.chain_modes.as_ref().is_none_or(|chain_modes| {
1041 chain_modes
1042 .read()
1043 .unwrap()
1044 .get(&chain_id)
1045 .is_some_and(ListeningMode::is_full)
1046 });
1047
1048 let (service_runtime_endpoint, service_runtime_task) =
1049 if self.chain_worker_config.long_lived_services {
1050 let actor =
1051 handle::ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await;
1052 (Some(actor.endpoint), Some(actor.task))
1053 } else {
1054 (None, None)
1055 };
1056
1057 let state = crate::chain_worker::state::ChainWorkerState::load(
1058 self.chain_worker_config.clone(),
1059 self.storage.clone(),
1060 self.block_cache.clone(),
1061 self.execution_state_cache.clone(),
1062 self.chain_modes.clone(),
1063 delivery_notifier,
1064 chain_id,
1065 service_runtime_endpoint,
1066 service_runtime_task,
1067 )
1068 .await?;
1069
1070 Ok(handle::create_chain_worker(
1071 state,
1072 is_tracked,
1073 &self.chain_worker_config,
1074 ))
1075 }
1076
1077 #[instrument(level = "trace", skip(self, block))]
1082 pub async fn stage_block_execution(
1083 &self,
1084 block: ProposedBlock,
1085 round: Option<u32>,
1086 published_blobs: Vec<Blob>,
1087 policy: BundleExecutionPolicy,
1088 ) -> Result<
1089 (
1090 ProposedBlock,
1091 Block,
1092 ChainInfoResponse,
1093 ResourceTracker,
1094 HashSet<ChainId>,
1095 ),
1096 WorkerError,
1097 > {
1098 let chain_id = block.chain_id;
1099 self.chain_write(chain_id, move |mut guard| async move {
1100 guard
1101 .stage_block_execution(block, round, &published_blobs, policy)
1102 .await
1103 })
1104 .await
1105 }
1106
1107 #[instrument(level = "trace", skip(self, chain_id, query))]
1112 pub async fn query_application(
1113 &self,
1114 chain_id: ChainId,
1115 query: Query,
1116 block_hash: Option<CryptoHash>,
1117 ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
1118 self.chain_write(chain_id, move |mut guard| async move {
1119 guard.query_application(query, block_hash).await
1120 })
1121 .await
1122 }
1123
1124 #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
1125 nickname = %self.nickname(),
1126 chain_id = %chain_id,
1127 application_id = %application_id
1128 ))]
1129 pub async fn describe_application(
1130 &self,
1131 chain_id: ChainId,
1132 application_id: ApplicationId,
1133 ) -> Result<ApplicationDescription, WorkerError> {
1134 let state = self.get_or_create_chain_worker(chain_id).await?;
1135 let guard = handle::read_lock_initialized(&state).await?;
1136 guard.describe_application_readonly(application_id).await
1137 }
1138
1139 #[instrument(
1141 level = "trace",
1142 skip(self, certificate, notify_when_messages_are_delivered),
1143 fields(
1144 nickname = %self.nickname(),
1145 chain_id = %certificate.block().header.chain_id,
1146 block_height = %certificate.block().header.height
1147 )
1148 )]
1149 async fn process_confirmed_block(
1150 &self,
1151 certificate: ConfirmedBlockCertificate,
1152 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1153 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1154 let chain_id = certificate.block().header.chain_id;
1155 self.chain_write(chain_id, move |mut guard| async move {
1156 guard
1157 .process_confirmed_block(certificate, notify_when_messages_are_delivered)
1158 .await
1159 })
1160 .await
1161 }
1162
1163 #[instrument(level = "trace", skip(self, certificate), fields(
1165 nickname = %self.nickname(),
1166 chain_id = %certificate.block().header.chain_id,
1167 block_height = %certificate.block().header.height
1168 ))]
1169 async fn process_validated_block(
1170 &self,
1171 certificate: ValidatedBlockCertificate,
1172 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1173 let chain_id = certificate.block().header.chain_id;
1174 self.chain_write(chain_id, move |mut guard| async move {
1175 guard.process_validated_block(certificate).await
1176 })
1177 .await
1178 }
1179
1180 #[instrument(level = "trace", skip(self, certificate), fields(
1182 nickname = %self.nickname(),
1183 chain_id = %certificate.value().chain_id(),
1184 height = %certificate.value().height()
1185 ))]
1186 async fn process_timeout(
1187 &self,
1188 certificate: TimeoutCertificate,
1189 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1190 let chain_id = certificate.value().chain_id();
1191 self.chain_write(chain_id, move |mut guard| async move {
1192 guard.process_timeout(certificate).await
1193 })
1194 .await
1195 }
1196
1197 #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
1198 nickname = %self.nickname(),
1199 origin = %origin,
1200 recipient = %recipient,
1201 num_bundles = %bundles.len()
1202 ))]
1203 async fn process_cross_chain_update(
1204 &self,
1205 origin: ChainId,
1206 recipient: ChainId,
1207 bundles: Vec<(Epoch, MessageBundle)>,
1208 previous_height: Option<BlockHeight>,
1209 ) -> Result<CrossChainUpdateResult, WorkerError> {
1210 self.chain_write(recipient, move |mut guard| async move {
1211 guard
1212 .process_cross_chain_update(origin, bundles, previous_height)
1213 .await
1214 })
1215 .await
1216 }
1217
1218 #[instrument(level = "trace", skip(self, chain_id, height), fields(
1220 nickname = %self.nickname(),
1221 chain_id = %chain_id,
1222 height = %height
1223 ))]
1224 #[cfg(with_testing)]
1225 pub async fn read_certificate(
1226 &self,
1227 chain_id: ChainId,
1228 height: BlockHeight,
1229 ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, WorkerError> {
1230 let state = self.get_or_create_chain_worker(chain_id).await?;
1231 let guard = handle::read_lock_initialized(&state).await?;
1232 guard.read_certificate(height).await
1233 }
1234
1235 #[instrument(level = "trace", skip(self), fields(
1241 nickname = %self.nickname(),
1242 chain_id = %chain_id
1243 ))]
1244 pub async fn chain_state_view(
1245 &self,
1246 chain_id: ChainId,
1247 ) -> Result<ChainStateViewReadGuard<StorageClient>, WorkerError> {
1248 let state = self.get_or_create_chain_worker(chain_id).await?;
1249 let guard = handle::read_lock(&state).await;
1250 Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map(
1251 guard,
1252 |s| s.chain(),
1253 )))
1254 }
1255
1256 #[instrument(skip_all, fields(
1257 nick = self.nickname(),
1258 chain_id = format!("{:.8}", proposal.content.block.chain_id),
1259 height = %proposal.content.block.height,
1260 ))]
1261 pub async fn handle_block_proposal(
1262 &self,
1263 proposal: BlockProposal,
1264 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1265 trace!("{} <-- {:?}", self.nickname(), proposal);
1266 #[cfg(with_metrics)]
1267 let round = proposal.content.round;
1268
1269 let chain_id = proposal.content.block.chain_id;
1270 let now = self.storage.clock().current_time();
1272 let block_timestamp = proposal.content.block.timestamp;
1273 let delta = block_timestamp.delta_since(now);
1274 let grace_period = TimeDelta::from_micros(
1275 u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros())
1276 .unwrap_or(u64::MAX),
1277 );
1278 if delta > TimeDelta::ZERO && delta <= grace_period {
1279 self.storage.clock().sleep_until(block_timestamp).await;
1280 }
1281
1282 let response = self
1283 .chain_write(chain_id, move |mut guard| async move {
1284 guard.handle_block_proposal(proposal).await
1285 })
1286 .await?;
1287 #[cfg(with_metrics)]
1288 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1289 .with_label_values(&[round.type_name()])
1290 .observe(round.number() as f64);
1291 Ok(response)
1292 }
1293
1294 #[instrument(skip_all, fields(
1297 chain_id = %certificate.value.chain_id,
1298 hash = %certificate.value.value_hash,
1299 ))]
1300 pub async fn handle_lite_certificate(
1301 &self,
1302 certificate: LiteCertificate<'_>,
1303 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1304 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1305 match self.full_certificate(certificate).await? {
1306 Either::Left(confirmed) => {
1307 Box::pin(
1308 self.handle_confirmed_certificate(
1309 confirmed,
1310 notify_when_messages_are_delivered,
1311 ),
1312 )
1313 .await
1314 }
1315 Either::Right(validated) => {
1316 if let Some(notifier) = notify_when_messages_are_delivered {
1317 if let Err(()) = notifier.send(()) {
1319 warn!("Failed to notify message delivery to caller");
1320 }
1321 }
1322 Box::pin(self.handle_validated_certificate(validated)).await
1323 }
1324 }
1325 }
1326
1327 #[instrument(skip_all, fields(
1329 nick = self.nickname(),
1330 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1331 height = %certificate.block().header.height,
1332 ))]
1333 pub async fn handle_confirmed_certificate(
1334 &self,
1335 certificate: ConfirmedBlockCertificate,
1336 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1337 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1338 trace!("{} <-- {:?}", self.nickname(), certificate);
1339 #[cfg(with_metrics)]
1340 let metrics_data = metrics::MetricsData::new(&certificate);
1341
1342 #[allow(unused_variables)]
1343 let (info, actions, outcome) =
1344 Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1345 .await?;
1346
1347 #[cfg(with_metrics)]
1348 if matches!(outcome, BlockOutcome::Processed) {
1349 metrics_data.record();
1350 }
1351 Ok((info, actions))
1352 }
1353
1354 #[instrument(skip_all, fields(
1356 nick = self.nickname(),
1357 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1358 height = %certificate.block().header.height,
1359 ))]
1360 pub async fn handle_validated_certificate(
1361 &self,
1362 certificate: ValidatedBlockCertificate,
1363 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1364 trace!("{} <-- {:?}", self.nickname(), certificate);
1365
1366 #[cfg(with_metrics)]
1367 let round = certificate.round;
1368 #[cfg(with_metrics)]
1369 let cert_str = certificate.inner().to_log_str();
1370
1371 #[allow(unused_variables)]
1372 let (info, actions, outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1373 #[cfg(with_metrics)]
1374 {
1375 if matches!(outcome, BlockOutcome::Processed) {
1376 metrics::NUM_ROUNDS_IN_CERTIFICATE
1377 .with_label_values(&[cert_str, round.type_name()])
1378 .observe(round.number() as f64);
1379 }
1380 }
1381 Ok((info, actions))
1382 }
1383
1384 #[instrument(skip_all, fields(
1386 nick = self.nickname(),
1387 chain_id = format!("{:.8}", certificate.inner().chain_id()),
1388 height = %certificate.inner().height(),
1389 ))]
1390 pub async fn handle_timeout_certificate(
1391 &self,
1392 certificate: TimeoutCertificate,
1393 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1394 trace!("{} <-- {:?}", self.nickname(), certificate);
1395 self.process_timeout(certificate).await
1396 }
1397
1398 #[instrument(skip_all, fields(
1399 nick = self.nickname(),
1400 chain_id = format!("{:.8}", query.chain_id)
1401 ))]
1402 pub async fn handle_chain_info_query(
1403 &self,
1404 query: ChainInfoQuery,
1405 ) -> Result<ChainInfoResponse, WorkerError> {
1406 trace!("{} <-- {:?}", self.nickname(), query);
1407 #[cfg(with_metrics)]
1408 metrics::CHAIN_INFO_QUERIES.inc();
1409 let chain_id = query.chain_id;
1410 let result = self
1411 .chain_write(chain_id, move |mut guard| async move {
1412 guard.handle_chain_info_query(query).await
1413 })
1414 .await;
1415 trace!("{} --> {:?}", self.nickname(), result);
1416 result
1417 }
1418
1419 #[instrument(skip_all, fields(
1420 nick = self.nickname(),
1421 chain_id = format!("{:.8}", chain_id)
1422 ))]
1423 pub async fn download_pending_blob(
1424 &self,
1425 chain_id: ChainId,
1426 blob_id: BlobId,
1427 ) -> Result<Arc<Blob>, WorkerError> {
1428 trace!("{} <-- download_pending_blob({blob_id:8})", self.nickname());
1429 let result = self
1430 .chain_read(chain_id, |guard| async move {
1431 guard.download_pending_blob(blob_id).await
1432 })
1433 .await;
1434 trace!(
1435 "{} --> {:?}",
1436 self.nickname(),
1437 result.as_ref().map(|_| blob_id)
1438 );
1439 result
1440 }
1441
1442 #[instrument(skip_all, fields(
1443 nick = self.nickname(),
1444 chain_id = format!("{:.8}", chain_id)
1445 ))]
1446 pub async fn handle_pending_blob(
1447 &self,
1448 chain_id: ChainId,
1449 blob: Blob,
1450 ) -> Result<ChainInfoResponse, WorkerError> {
1451 let blob_id = blob.id();
1452 trace!("{} <-- handle_pending_blob({blob_id:8})", self.nickname());
1453 let result = self
1454 .chain_write(chain_id, move |mut guard| async move {
1455 guard.handle_pending_blob(blob).await
1456 })
1457 .await;
1458 trace!(
1459 "{} --> {:?}",
1460 self.nickname(),
1461 result.as_ref().map(|_| blob_id)
1462 );
1463 result
1464 }
1465
1466 #[instrument(skip_all, fields(
1467 nick = self.nickname(),
1468 chain_id = format!("{:.8}", request.target_chain_id())
1469 ))]
1470 pub async fn handle_cross_chain_request(
1471 &self,
1472 request: CrossChainRequest,
1473 ) -> Result<NetworkActions, WorkerError> {
1474 trace!("{} <-- {:?}", self.nickname(), request);
1475 match request {
1476 CrossChainRequest::UpdateRecipient {
1477 sender,
1478 recipient,
1479 bundles,
1480 previous_height,
1481 } => {
1482 let mut actions = NetworkActions::default();
1483 let origin = sender;
1484 match self
1485 .process_cross_chain_update(origin, recipient, bundles, previous_height)
1486 .await?
1487 {
1488 CrossChainUpdateResult::NothingToDo => {}
1489 CrossChainUpdateResult::Updated(height) => {
1490 actions.notifications.push(Notification {
1491 chain_id: recipient,
1492 reason: Reason::NewIncomingBundle { origin, height },
1493 });
1494 actions.cross_chain_requests.push(
1495 CrossChainRequest::ConfirmUpdatedRecipient {
1496 sender,
1497 recipient,
1498 latest_height: height,
1499 },
1500 );
1501 }
1502 CrossChainUpdateResult::GapDetected {
1503 origin,
1504 retransmit_from,
1505 } => {
1506 actions
1507 .cross_chain_requests
1508 .push(CrossChainRequest::RevertConfirm {
1509 sender: origin,
1510 recipient,
1511 retransmit_from,
1512 });
1513 }
1514 }
1515 Ok(actions)
1516 }
1517 CrossChainRequest::ConfirmUpdatedRecipient {
1518 sender,
1519 recipient,
1520 latest_height,
1521 } => {
1522 let actions = self
1523 .chain_write(sender, move |mut guard| async move {
1524 guard
1525 .confirm_updated_recipient(recipient, latest_height)
1526 .await
1527 })
1528 .await?;
1529 Ok(actions)
1530 }
1531 CrossChainRequest::RevertConfirm {
1532 sender,
1533 recipient,
1534 retransmit_from,
1535 } => {
1536 self.chain_write(sender, move |mut guard| async move {
1537 guard
1538 .handle_revert_confirm(recipient, retransmit_from)
1539 .await
1540 })
1541 .await
1542 }
1543 }
1544 }
1545
1546 #[instrument(skip_all, fields(
1548 nickname = %self.nickname(),
1549 chain_id = %chain_id,
1550 num_trackers = %new_trackers.len()
1551 ))]
1552 pub async fn update_received_certificate_trackers(
1553 &self,
1554 chain_id: ChainId,
1555 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1556 ) -> Result<(), WorkerError> {
1557 self.chain_write(chain_id, move |mut guard| async move {
1558 guard
1559 .update_received_certificate_trackers(new_trackers)
1560 .await
1561 })
1562 .await
1563 }
1564
1565 #[instrument(skip_all, fields(
1567 nickname = %self.nickname(),
1568 chain_id = %chain_id,
1569 start = %start,
1570 end = %end
1571 ))]
1572 pub async fn get_preprocessed_block_hashes(
1573 &self,
1574 chain_id: ChainId,
1575 start: BlockHeight,
1576 end: BlockHeight,
1577 ) -> Result<Vec<CryptoHash>, WorkerError> {
1578 self.chain_read(chain_id, |guard| async move {
1579 guard.get_preprocessed_block_hashes(start, end).await
1580 })
1581 .await
1582 }
1583
1584 #[instrument(skip_all, fields(
1586 nickname = %self.nickname(),
1587 chain_id = %chain_id,
1588 origin = %origin
1589 ))]
1590 pub async fn get_inbox_next_height(
1591 &self,
1592 chain_id: ChainId,
1593 origin: ChainId,
1594 ) -> Result<BlockHeight, WorkerError> {
1595 self.chain_read(chain_id, |guard| async move {
1596 guard.get_inbox_next_height(origin).await
1597 })
1598 .await
1599 }
1600
1601 #[instrument(skip_all, fields(
1604 nickname = %self.nickname(),
1605 chain_id = %chain_id,
1606 num_blob_ids = %blob_ids.len()
1607 ))]
1608 pub async fn get_locking_blobs(
1609 &self,
1610 chain_id: ChainId,
1611 blob_ids: Vec<BlobId>,
1612 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1613 self.chain_read(chain_id, |guard| async move {
1614 guard.get_locking_blobs(blob_ids).await
1615 })
1616 .await
1617 }
1618
1619 pub async fn get_block_hashes(
1621 &self,
1622 chain_id: ChainId,
1623 heights: Vec<BlockHeight>,
1624 ) -> Result<Vec<CryptoHash>, WorkerError> {
1625 self.chain_read(chain_id, |guard| async move {
1626 guard.get_block_hashes(heights).await
1627 })
1628 .await
1629 }
1630
1631 pub async fn get_proposed_blobs(
1633 &self,
1634 chain_id: ChainId,
1635 blob_ids: Vec<BlobId>,
1636 ) -> Result<Vec<Blob>, WorkerError> {
1637 self.chain_read(chain_id, |guard| async move {
1638 guard.get_proposed_blobs(blob_ids).await
1639 })
1640 .await
1641 }
1642
1643 pub async fn get_event_subscriptions(
1645 &self,
1646 chain_id: ChainId,
1647 ) -> Result<EventSubscriptionsResult, WorkerError> {
1648 self.chain_read(chain_id, |guard| async move {
1649 guard.get_event_subscriptions().await
1650 })
1651 .await
1652 }
1653
1654 pub async fn get_next_expected_event(
1656 &self,
1657 chain_id: ChainId,
1658 stream_id: StreamId,
1659 ) -> Result<Option<u32>, WorkerError> {
1660 self.chain_read(chain_id, |guard| async move {
1661 guard.get_next_expected_event(stream_id).await
1662 })
1663 .await
1664 }
1665
1666 pub async fn next_expected_events(
1668 &self,
1669 chain_id: ChainId,
1670 stream_ids: Vec<StreamId>,
1671 ) -> Result<BTreeMap<StreamId, u32>, WorkerError> {
1672 self.chain_read(chain_id, |guard| async move {
1673 guard.get_next_expected_events(stream_ids).await
1674 })
1675 .await
1676 }
1677
1678 pub async fn get_received_certificate_trackers(
1680 &self,
1681 chain_id: ChainId,
1682 ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1683 self.chain_read(chain_id, |guard| async move {
1684 guard.get_received_certificate_trackers().await
1685 })
1686 .await
1687 }
1688
1689 pub async fn cross_chain_network_actions(
1693 &self,
1694 chain_id: ChainId,
1695 ) -> Result<NetworkActions, WorkerError> {
1696 self.chain_read(chain_id, |guard| async move {
1697 guard.cross_chain_network_actions().await
1698 })
1699 .await
1700 }
1701
1702 pub async fn get_tip_state_and_outbox_info(
1704 &self,
1705 chain_id: ChainId,
1706 receiver_id: ChainId,
1707 ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1708 self.chain_read(chain_id, |guard| async move {
1709 guard.get_tip_state_and_outbox_info(receiver_id).await
1710 })
1711 .await
1712 }
1713
1714 pub async fn get_next_height_to_preprocess(
1716 &self,
1717 chain_id: ChainId,
1718 ) -> Result<BlockHeight, WorkerError> {
1719 self.chain_read(chain_id, |guard| async move {
1720 guard.get_next_height_to_preprocess().await
1721 })
1722 .await
1723 }
1724}
1725
1726#[cfg(with_testing)]
1727impl<StorageClient> WorkerState<StorageClient>
1728where
1729 StorageClient: Storage + Clone + 'static,
1730{
1731 #[instrument(level = "trace", skip(self))]
1737 pub fn public_key(&self) -> ValidatorPublicKey {
1738 self.chain_worker_config
1739 .key_pair()
1740 .expect(
1741 "Test validator should have a key pair assigned to it \
1742 in order to obtain its public key",
1743 )
1744 .public()
1745 }
1746}