1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14 data_types::{
15 ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, TimeDelta,
16 Timestamp,
17 },
18 doc_scalar,
19 hashed::Hashed,
20 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
21};
22#[cfg(with_testing)]
23use linera_chain::ChainExecutionContext;
24use linera_chain::{
25 data_types::{
26 BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, MessageBundle, ProposedBlock,
27 },
28 types::{
29 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
30 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
31 },
32 ChainError, ChainStateView,
33};
34use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
35use linera_storage::{Clock as _, Storage};
36use linera_views::{context::InactiveContext, ViewError};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39use tokio::sync::{oneshot, OwnedRwLockReadGuard};
40use tracing::{instrument, trace, warn};
41
42pub struct ChainStateViewReadGuard<S: Storage>(
49 OwnedRwLockReadGuard<ChainWorkerState<S>, ChainStateView<S::Context>>,
50);
51
52impl<S: Storage> std::ops::Deref for ChainStateViewReadGuard<S> {
53 type Target = ChainStateView<S::Context>;
54
55 fn deref(&self) -> &Self::Target {
56 &self.0
57 }
58}
59
60pub(crate) use crate::chain_worker::EventSubscriptionsResult;
62use crate::{
63 chain_worker::{
64 handle::{self, ServiceRuntimeActor},
65 state::ChainWorkerState,
66 BlockOutcome, ChainWorkerConfig, DeliveryNotifier,
67 },
68 client::ListeningMode,
69 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
70 notifier::Notifier,
71 value_cache::ValueCache,
72};
73
74#[cfg(test)]
75#[path = "unit_tests/worker_tests.rs"]
76mod worker_tests;
77
78#[cfg(not(web))]
81pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> sync_wrapper::SyncFuture<F> {
82 sync_wrapper::SyncFuture::new(f)
83}
84
85#[cfg(web)]
88pub(crate) fn wrap_future<F: std::future::Future>(f: F) -> F {
89 f
90}
91
92#[cfg(with_metrics)]
93mod metrics {
94 use std::sync::LazyLock;
95
96 use linera_base::prometheus_util::{
97 exponential_bucket_interval, register_histogram, register_histogram_vec,
98 register_int_counter, register_int_counter_vec,
99 };
100 use linera_chain::types::ConfirmedBlockCertificate;
101 use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
102
103 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
104 register_histogram_vec(
105 "num_rounds_in_certificate",
106 "Number of rounds in certificate",
107 &["certificate_value", "round_type"],
108 exponential_bucket_interval(0.1, 50.0),
109 )
110 });
111
112 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
113 register_histogram_vec(
114 "num_rounds_in_block_proposal",
115 "Number of rounds in block proposal",
116 &["round_type"],
117 exponential_bucket_interval(0.1, 50.0),
118 )
119 });
120
121 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
122 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
123
124 pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
125 LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
126
127 pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
128 LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
129
130 pub static OPERATION_COUNT: LazyLock<IntCounter> =
131 LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
132
133 pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
134 register_histogram(
135 "operations_per_block",
136 "Number of operations per block",
137 exponential_bucket_interval(1.0, 10000.0),
138 )
139 });
140
141 pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
142 register_histogram(
143 "incoming_bundles_per_block",
144 "Number of incoming bundles per block",
145 exponential_bucket_interval(1.0, 10000.0),
146 )
147 });
148
149 pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
150 register_histogram(
151 "transactions_per_block",
152 "Number of transactions per block",
153 exponential_bucket_interval(1.0, 10000.0),
154 )
155 });
156
157 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
158 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
159 });
160
161 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
162 register_int_counter_vec(
163 "certificates_signed",
164 "Number of confirmed block certificates signed by each validator",
165 &["validator_name"],
166 )
167 });
168
169 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
170 register_int_counter(
171 "chain_info_queries",
172 "Number of chain info queries processed",
173 )
174 });
175
176 pub struct MetricsData {
178 certificate_log_str: &'static str,
179 round_type: &'static str,
180 round_number: u32,
181 confirmed_transactions: u64,
182 confirmed_incoming_bundles: u64,
183 confirmed_incoming_messages: u64,
184 confirmed_operations: u64,
185 validators_with_signatures: Vec<String>,
186 }
187
188 impl MetricsData {
189 pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
191 Self {
192 certificate_log_str: certificate.inner().to_log_str(),
193 round_type: certificate.round.type_name(),
194 round_number: certificate.round.number(),
195 confirmed_transactions: certificate.block().body.transactions.len() as u64,
196 confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
197 as u64,
198 confirmed_incoming_messages: certificate
199 .block()
200 .body
201 .incoming_bundles()
202 .map(|b| b.messages().count())
203 .sum::<usize>() as u64,
204 confirmed_operations: certificate.block().body.operations().count() as u64,
205 validators_with_signatures: certificate
206 .signatures()
207 .iter()
208 .map(|(validator_name, _)| validator_name.to_string())
209 .collect(),
210 }
211 }
212
213 pub fn record(self) {
215 NUM_BLOCKS.with_label_values(&[]).inc();
216 NUM_ROUNDS_IN_CERTIFICATE
217 .with_label_values(&[self.certificate_log_str, self.round_type])
218 .observe(self.round_number as f64);
219 TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
220 INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
221 OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
222 if self.confirmed_transactions > 0 {
223 TRANSACTION_COUNT
224 .with_label_values(&[])
225 .inc_by(self.confirmed_transactions);
226 if self.confirmed_incoming_bundles > 0 {
227 INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
228 }
229 if self.confirmed_incoming_messages > 0 {
230 INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
231 }
232 if self.confirmed_operations > 0 {
233 OPERATION_COUNT.inc_by(self.confirmed_operations);
234 }
235 }
236
237 for validator_name in self.validators_with_signatures {
238 CERTIFICATES_SIGNED
239 .with_label_values(&[&validator_name])
240 .inc();
241 }
242 }
243 }
244}
245
246#[derive(Default, Debug)]
248pub struct NetworkActions {
249 pub cross_chain_requests: Vec<CrossChainRequest>,
251 pub notifications: Vec<Notification>,
253}
254
255impl NetworkActions {
256 pub fn extend(&mut self, other: NetworkActions) {
257 self.cross_chain_requests.extend(other.cross_chain_requests);
258 self.notifications.extend(other.notifications);
259 }
260}
261
262#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
263pub struct Notification {
265 pub chain_id: ChainId,
266 pub reason: Reason,
267}
268
269doc_scalar!(
270 Notification,
271 "Notify that a chain has a new certified block or a new message"
272);
273
274#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
275pub enum Reason {
277 NewBlock {
278 height: BlockHeight,
279 hash: CryptoHash,
280 },
281 NewEvents {
282 height: BlockHeight,
283 hash: CryptoHash,
284 event_streams: BTreeSet<StreamId>,
285 },
286 NewIncomingBundle {
287 origin: ChainId,
288 height: BlockHeight,
289 },
290 NewRound {
291 height: BlockHeight,
292 round: Round,
293 },
294 BlockExecuted {
295 height: BlockHeight,
296 hash: CryptoHash,
297 },
298}
299
300#[derive(Debug, Error)]
302pub enum WorkerError {
303 #[error(transparent)]
304 CryptoError(#[from] CryptoError),
305
306 #[error(transparent)]
307 ArithmeticError(#[from] ArithmeticError),
308
309 #[error(transparent)]
310 ViewError(#[from] ViewError),
311
312 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
313 ReadCertificatesError(Vec<CryptoHash>),
314
315 #[error(transparent)]
316 ChainError(#[from] Box<ChainError>),
317
318 #[error(transparent)]
319 BcsError(#[from] bcs::Error),
320
321 #[error("Block was not signed by an authorized owner")]
323 InvalidOwner,
324
325 #[error("Operations in the block are not authenticated by the proper owner: {0}")]
326 InvalidSigner(AccountOwner),
327
328 #[error(
330 "Chain is expecting a next block at height {expected_block_height} but the given block \
331 is at height {found_block_height} instead"
332 )]
333 UnexpectedBlockHeight {
334 expected_block_height: BlockHeight,
335 found_block_height: BlockHeight,
336 },
337 #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
338 InvalidEpoch {
339 chain_id: ChainId,
340 chain_epoch: Epoch,
341 epoch: Epoch,
342 },
343
344 #[error("Events not found: {0:?}")]
345 EventsNotFound(Vec<EventId>),
346
347 #[error("Invalid cross-chain request")]
349 InvalidCrossChainRequest,
350 #[error("The block does not contain the hash that we expected for the previous block")]
351 InvalidBlockChaining,
352 #[error(
353 "The given outcome is not what we computed after executing the block.\n\
354 Computed: {computed:#?}\n\
355 Submitted: {submitted:#?}"
356 )]
357 IncorrectOutcome {
358 computed: Box<BlockExecutionOutcome>,
359 submitted: Box<BlockExecutionOutcome>,
360 },
361 #[error(
362 "Block timestamp ({block_timestamp}) is further in the future from local time \
363 ({local_time}) than block time grace period ({block_time_grace_period:?})"
364 )]
365 InvalidTimestamp {
366 block_timestamp: Timestamp,
367 local_time: Timestamp,
368 block_time_grace_period: Duration,
369 },
370 #[error("We don't have the value for the certificate.")]
371 MissingCertificateValue,
372 #[error("The hash certificate doesn't match its value.")]
373 InvalidLiteCertificate,
374 #[error("Fast blocks cannot query oracles")]
375 FastBlockUsingOracles,
376 #[error("Blobs not found: {0:?}")]
377 BlobsNotFound(Vec<BlobId>),
378 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
379 ConfirmedLogEntryNotFound {
380 height: BlockHeight,
381 chain_id: ChainId,
382 },
383 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
384 PreprocessedBlocksEntryNotFound {
385 height: BlockHeight,
386 chain_id: ChainId,
387 },
388 #[error("The block proposal is invalid: {0}")]
389 InvalidBlockProposal(String),
390 #[error("Blob was not required by any pending block")]
391 UnexpectedBlob,
392 #[error("Number of published blobs per block must not exceed {0}")]
393 TooManyPublishedBlobs(u64),
394 #[error("Missing network description")]
395 MissingNetworkDescription,
396 #[error("thread error: {0}")]
397 Thread(#[from] web_thread_pool::Error),
398}
399
400impl WorkerError {
401 pub fn is_local(&self) -> bool {
405 match self {
406 WorkerError::CryptoError(_)
407 | WorkerError::ArithmeticError(_)
408 | WorkerError::InvalidOwner
409 | WorkerError::InvalidSigner(_)
410 | WorkerError::UnexpectedBlockHeight { .. }
411 | WorkerError::InvalidEpoch { .. }
412 | WorkerError::EventsNotFound(_)
413 | WorkerError::InvalidBlockChaining
414 | WorkerError::IncorrectOutcome { .. }
415 | WorkerError::InvalidTimestamp { .. }
416 | WorkerError::MissingCertificateValue
417 | WorkerError::InvalidLiteCertificate
418 | WorkerError::FastBlockUsingOracles
419 | WorkerError::BlobsNotFound(_)
420 | WorkerError::InvalidBlockProposal(_)
421 | WorkerError::UnexpectedBlob
422 | WorkerError::TooManyPublishedBlobs(_)
423 | WorkerError::ViewError(ViewError::NotFound(_)) => false,
424 WorkerError::BcsError(_)
425 | WorkerError::InvalidCrossChainRequest
426 | WorkerError::ViewError(_)
427 | WorkerError::ConfirmedLogEntryNotFound { .. }
428 | WorkerError::PreprocessedBlocksEntryNotFound { .. }
429 | WorkerError::MissingNetworkDescription
430 | WorkerError::Thread(_)
431 | WorkerError::ReadCertificatesError(_) => true,
432 WorkerError::ChainError(chain_error) => chain_error.is_local(),
433 }
434 }
435}
436
437impl From<ChainError> for WorkerError {
438 #[instrument(level = "trace", skip(chain_error))]
439 fn from(chain_error: ChainError) -> Self {
440 match chain_error {
441 ChainError::ExecutionError(execution_error, context) => match *execution_error {
442 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
443 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
444 _ => Self::ChainError(Box::new(ChainError::ExecutionError(
445 execution_error,
446 context,
447 ))),
448 },
449 error => Self::ChainError(Box::new(error)),
450 }
451 }
452}
453
454#[cfg(with_testing)]
455impl WorkerError {
456 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
462 let WorkerError::ChainError(chain_error) = self else {
463 panic!("Expected an `ExecutionError`. Got: {self:#?}");
464 };
465
466 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
467 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
468 };
469
470 assert_eq!(context, expected_context);
471
472 *execution_error
473 }
474}
475
476type ChainWorkerArc<S> = Arc<tokio::sync::RwLock<ChainWorkerState<S>>>;
477type ChainWorkerMap<S> =
478 Arc<papaya::HashMap<ChainId, std::sync::Weak<tokio::sync::RwLock<ChainWorkerState<S>>>>>;
479
480fn start_sweep<S: Storage + Clone + 'static>(
484 chain_workers: &ChainWorkerMap<S>,
485 config: &ChainWorkerConfig,
486) {
487 let interval = match (config.ttl, config.sender_chain_ttl) {
490 (None, None) => return,
491 (Some(d), None) | (None, Some(d)) => d,
492 (Some(a), Some(b)) => a.min(b),
493 };
494 let weak_map = Arc::downgrade(chain_workers);
495 linera_base::Task::spawn(async move {
496 loop {
497 linera_base::time::timer::sleep(interval).await;
498 let Some(map) = weak_map.upgrade() else {
499 break;
500 };
501 let guard = map.guard();
502 let dead_keys: Vec<ChainId> = map
503 .pin_owned()
504 .iter()
505 .filter(|(_, weak)| weak.strong_count() == 0)
506 .map(|(chain_id, _)| *chain_id)
507 .collect();
508 for chain_id in dead_keys {
509 map.remove(&chain_id, &guard);
510 }
511 drop(guard);
512 drop(map);
513 }
514 })
515 .forget();
516}
517
518pub struct WorkerState<StorageClient: Storage> {
520 storage: StorageClient,
522 chain_worker_config: ChainWorkerConfig,
524 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
525 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
526 chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
528 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
531 chain_workers: ChainWorkerMap<StorageClient>,
535}
536
537impl<StorageClient> Clone for WorkerState<StorageClient>
538where
539 StorageClient: Storage + Clone,
540{
541 fn clone(&self) -> Self {
542 WorkerState {
543 storage: self.storage.clone(),
544 chain_worker_config: self.chain_worker_config.clone(),
545 block_cache: self.block_cache.clone(),
546 execution_state_cache: self.execution_state_cache.clone(),
547 chain_modes: self.chain_modes.clone(),
548 delivery_notifiers: self.delivery_notifiers.clone(),
549 chain_workers: self.chain_workers.clone(),
550 }
551 }
552}
553
554pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
555
556impl<StorageClient> WorkerState<StorageClient>
557where
558 StorageClient: Storage,
559{
560 #[instrument(level = "trace", skip(self))]
561 pub fn nickname(&self) -> &str {
562 &self.chain_worker_config.nickname
563 }
564
565 #[instrument(level = "trace", skip(self))]
567 #[cfg(not(feature = "test"))]
568 pub(crate) fn storage_client(&self) -> &StorageClient {
569 &self.storage
570 }
571
572 #[instrument(level = "trace", skip(self))]
575 #[cfg(feature = "test")]
576 pub fn storage_client(&self) -> &StorageClient {
577 &self.storage
578 }
579
580 #[instrument(level = "trace", skip(self, certificate))]
581 pub(crate) async fn full_certificate(
582 &self,
583 certificate: LiteCertificate<'_>,
584 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
585 let block = self
586 .block_cache
587 .get(&certificate.value.value_hash)
588 .ok_or(WorkerError::MissingCertificateValue)?;
589
590 match certificate.value.kind {
591 linera_chain::types::CertificateKind::Confirmed => {
592 let value = ConfirmedBlock::from_hashed(block);
593 Ok(Either::Left(
594 certificate
595 .with_value(value)
596 .ok_or(WorkerError::InvalidLiteCertificate)?,
597 ))
598 }
599 linera_chain::types::CertificateKind::Validated => {
600 let value = ValidatedBlock::from_hashed(block);
601 Ok(Either::Right(
602 certificate
603 .with_value(value)
604 .ok_or(WorkerError::InvalidLiteCertificate)?,
605 ))
606 }
607 _ => Err(WorkerError::InvalidLiteCertificate),
608 }
609 }
610}
611
612#[allow(async_fn_in_trait)]
613#[cfg_attr(not(web), trait_variant::make(Send))]
614pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
615 async fn process_certificate<S: Storage + Clone + 'static>(
616 worker: &WorkerState<S>,
617 certificate: GenericCertificate<Self>,
618 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
619}
620
621impl ProcessableCertificate for ConfirmedBlock {
622 async fn process_certificate<S: Storage + Clone + 'static>(
623 worker: &WorkerState<S>,
624 certificate: ConfirmedBlockCertificate,
625 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
626 Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
627 }
628}
629
630impl ProcessableCertificate for ValidatedBlock {
631 async fn process_certificate<S: Storage + Clone + 'static>(
632 worker: &WorkerState<S>,
633 certificate: ValidatedBlockCertificate,
634 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
635 Box::pin(worker.handle_validated_certificate(certificate)).await
636 }
637}
638
639impl ProcessableCertificate for Timeout {
640 async fn process_certificate<S: Storage + Clone + 'static>(
641 worker: &WorkerState<S>,
642 certificate: TimeoutCertificate,
643 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
644 worker.handle_timeout_certificate(certificate).await
645 }
646}
647
648impl<StorageClient> WorkerState<StorageClient>
649where
650 StorageClient: Storage + Clone + 'static,
651{
652 #[instrument(level = "trace", skip(storage, chain_worker_config))]
657 pub fn new(
658 storage: StorageClient,
659 chain_worker_config: ChainWorkerConfig,
660 chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
661 ) -> Self {
662 let chain_workers = Arc::new(papaya::HashMap::new());
663 start_sweep(&chain_workers, &chain_worker_config);
664 let block_cache_size = chain_worker_config.block_cache_size;
665 let execution_state_cache_size = chain_worker_config.execution_state_cache_size;
666 WorkerState {
667 storage,
668 chain_worker_config,
669 block_cache: Arc::new(ValueCache::new(block_cache_size)),
670 execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
671 chain_modes,
672 delivery_notifiers: Arc::default(),
673 chain_workers,
674 }
675 }
676
677 #[instrument(level = "trace", skip(self, certificate, notifier))]
678 #[inline]
679 pub async fn fully_handle_certificate_with_notifications<T>(
680 &self,
681 certificate: GenericCertificate<T>,
682 notifier: &impl Notifier,
683 ) -> Result<ChainInfoResponse, WorkerError>
684 where
685 T: ProcessableCertificate,
686 {
687 let notifications = (*notifier).clone();
688 let this = self.clone();
689 linera_base::Task::spawn(async move {
690 let (response, actions) =
691 ProcessableCertificate::process_certificate(&this, certificate).await?;
692 notifications.notify(&actions.notifications);
693 let mut requests = VecDeque::from(actions.cross_chain_requests);
694 while let Some(request) = requests.pop_front() {
695 let actions = this.handle_cross_chain_request(request).await?;
696 requests.extend(actions.cross_chain_requests);
697 notifications.notify(&actions.notifications);
698 }
699 Ok(response)
700 })
701 .await
702 }
703
704 async fn chain_read<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
709 where
710 F: FnOnce(OwnedRwLockReadGuard<ChainWorkerState<StorageClient>>) -> Fut,
711 Fut: std::future::Future<Output = Result<R, WorkerError>>,
712 {
713 let state = self.get_or_create_chain_worker(chain_id).await?;
714 Box::pin(wrap_future(async move {
715 let guard = handle::read_lock(&state).await;
716 f(guard).await
717 }))
718 .await
719 }
720
721 async fn chain_write<R, F, Fut>(&self, chain_id: ChainId, f: F) -> Result<R, WorkerError>
727 where
728 F: FnOnce(handle::RollbackGuard<StorageClient>) -> Fut,
729 Fut: std::future::Future<Output = Result<R, WorkerError>>,
730 {
731 let state = self.get_or_create_chain_worker(chain_id).await?;
732 Box::pin(wrap_future(async move {
733 let guard = handle::write_lock(&state).await;
734 f(guard).await
735 }))
736 .await
737 }
738
739 fn get_or_create_chain_worker(
745 &self,
746 chain_id: ChainId,
747 ) -> std::pin::Pin<
748 Box<
749 impl std::future::Future<Output = Result<ChainWorkerArc<StorageClient>, WorkerError>> + '_,
750 >,
751 > {
752 Box::pin(wrap_future(async move {
753 {
755 let guard = self.chain_workers.pin();
756 if let Some(weak) = guard.get(&chain_id) {
757 if let Some(strong) = weak.upgrade() {
758 return Ok(strong);
759 }
760 }
761 }
762
763 let delivery_notifier = self
765 .delivery_notifiers
766 .lock()
767 .unwrap()
768 .entry(chain_id)
769 .or_default()
770 .clone();
771
772 let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
773 chain_modes
774 .read()
775 .unwrap()
776 .get(&chain_id)
777 .is_some_and(ListeningMode::is_full)
778 });
779
780 let (service_runtime_task, service_runtime_endpoint) =
781 if self.chain_worker_config.long_lived_services {
782 let actor =
783 ServiceRuntimeActor::spawn(chain_id, self.storage.thread_pool()).await;
784 (Some(actor.task), Some(actor.endpoint))
785 } else {
786 (None, None)
787 };
788
789 let state = crate::chain_worker::state::ChainWorkerState::load(
790 self.chain_worker_config.clone(),
791 self.storage.clone(),
792 self.block_cache.clone(),
793 self.execution_state_cache.clone(),
794 self.chain_modes.clone(),
795 delivery_notifier,
796 chain_id,
797 service_runtime_endpoint,
798 service_runtime_task,
799 )
800 .await?;
801
802 let worker = handle::create_chain_worker(state, is_tracked, &self.chain_worker_config);
803
804 let weak = Arc::downgrade(&worker);
806 let guard = self.chain_workers.guard();
807 match self.chain_workers.compute(
808 chain_id,
809 |existing| match existing {
810 Some((_, e)) => match e.upgrade() {
811 Some(live) => papaya::Operation::Abort(live),
812 None => papaya::Operation::Insert(weak.clone()),
813 },
814 None => papaya::Operation::Insert(weak.clone()),
815 },
816 &guard,
817 ) {
818 papaya::Compute::Aborted(existing, ..) => Ok(existing),
819 papaya::Compute::Inserted { .. } | papaya::Compute::Updated { .. } => Ok(worker),
820 papaya::Compute::Removed { .. } => unreachable!(),
821 }
822 }))
823 }
824
825 #[instrument(level = "trace", skip(self, block))]
827 pub async fn stage_block_execution(
828 &self,
829 block: ProposedBlock,
830 round: Option<u32>,
831 published_blobs: Vec<Blob>,
832 ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> {
833 let chain_id = block.chain_id;
834 self.chain_write(chain_id, |mut guard| async move {
835 guard
836 .stage_block_execution(block, round, &published_blobs)
837 .await
838 })
839 .await
840 }
841
842 #[instrument(level = "trace", skip(self, block))]
847 pub async fn stage_block_execution_with_policy(
848 &self,
849 block: ProposedBlock,
850 round: Option<u32>,
851 published_blobs: Vec<Blob>,
852 policy: BundleExecutionPolicy,
853 ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), WorkerError> {
854 let chain_id = block.chain_id;
855 self.chain_write(chain_id, |mut guard| async move {
856 guard
857 .stage_block_execution_with_policy(block, round, &published_blobs, policy)
858 .await
859 })
860 .await
861 }
862
863 #[instrument(level = "trace", skip(self, chain_id, query))]
868 pub async fn query_application(
869 &self,
870 chain_id: ChainId,
871 query: Query,
872 block_hash: Option<CryptoHash>,
873 ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
874 self.chain_write(chain_id, |mut guard| async move {
875 guard.query_application(query, block_hash).await
876 })
877 .await
878 }
879
880 #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
881 nickname = %self.nickname(),
882 chain_id = %chain_id,
883 application_id = %application_id
884 ))]
885 pub async fn describe_application(
886 &self,
887 chain_id: ChainId,
888 application_id: ApplicationId,
889 ) -> Result<ApplicationDescription, WorkerError> {
890 let state = self.get_or_create_chain_worker(chain_id).await?;
891 let guard = handle::read_lock_initialized(&state).await?;
892 guard.describe_application_readonly(application_id).await
893 }
894
895 #[instrument(
897 level = "trace",
898 skip(self, certificate, notify_when_messages_are_delivered),
899 fields(
900 nickname = %self.nickname(),
901 chain_id = %certificate.block().header.chain_id,
902 block_height = %certificate.block().header.height
903 )
904 )]
905 async fn process_confirmed_block(
906 &self,
907 certificate: ConfirmedBlockCertificate,
908 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
909 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
910 let chain_id = certificate.block().header.chain_id;
911 self.chain_write(chain_id, |mut guard| async move {
912 guard
913 .process_confirmed_block(certificate, notify_when_messages_are_delivered)
914 .await
915 })
916 .await
917 }
918
919 #[instrument(level = "trace", skip(self, certificate), fields(
921 nickname = %self.nickname(),
922 chain_id = %certificate.block().header.chain_id,
923 block_height = %certificate.block().header.height
924 ))]
925 async fn process_validated_block(
926 &self,
927 certificate: ValidatedBlockCertificate,
928 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
929 let chain_id = certificate.block().header.chain_id;
930 self.chain_write(chain_id, |mut guard| async move {
931 guard.process_validated_block(certificate).await
932 })
933 .await
934 }
935
936 #[instrument(level = "trace", skip(self, certificate), fields(
938 nickname = %self.nickname(),
939 chain_id = %certificate.value().chain_id(),
940 height = %certificate.value().height()
941 ))]
942 async fn process_timeout(
943 &self,
944 certificate: TimeoutCertificate,
945 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
946 let chain_id = certificate.value().chain_id();
947 self.chain_write(chain_id, |mut guard| async move {
948 guard.process_timeout(certificate).await
949 })
950 .await
951 }
952
953 #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
954 nickname = %self.nickname(),
955 origin = %origin,
956 recipient = %recipient,
957 num_bundles = %bundles.len()
958 ))]
959 async fn process_cross_chain_update(
960 &self,
961 origin: ChainId,
962 recipient: ChainId,
963 bundles: Vec<(Epoch, MessageBundle)>,
964 ) -> Result<Option<BlockHeight>, WorkerError> {
965 self.chain_write(recipient, |mut guard| async move {
966 guard.process_cross_chain_update(origin, bundles).await
967 })
968 .await
969 }
970
971 #[instrument(level = "trace", skip(self, chain_id, height), fields(
973 nickname = %self.nickname(),
974 chain_id = %chain_id,
975 height = %height
976 ))]
977 #[cfg(with_testing)]
978 pub async fn read_certificate(
979 &self,
980 chain_id: ChainId,
981 height: BlockHeight,
982 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
983 let state = self.get_or_create_chain_worker(chain_id).await?;
984 let guard = handle::read_lock_initialized(&state).await?;
985 guard.read_certificate(height).await
986 }
987
988 #[instrument(level = "trace", skip(self), fields(
994 nickname = %self.nickname(),
995 chain_id = %chain_id
996 ))]
997 pub async fn chain_state_view(
998 &self,
999 chain_id: ChainId,
1000 ) -> Result<ChainStateViewReadGuard<StorageClient>, WorkerError> {
1001 let state = self.get_or_create_chain_worker(chain_id).await?;
1002 let guard = handle::read_lock(&state).await;
1003 Ok(ChainStateViewReadGuard(OwnedRwLockReadGuard::map(
1004 guard,
1005 |s| s.chain(),
1006 )))
1007 }
1008
1009 #[instrument(skip_all, fields(
1010 nick = self.nickname(),
1011 chain_id = format!("{:.8}", proposal.content.block.chain_id),
1012 height = %proposal.content.block.height,
1013 ))]
1014 pub async fn handle_block_proposal(
1015 &self,
1016 proposal: BlockProposal,
1017 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1018 trace!("{} <-- {:?}", self.nickname(), proposal);
1019 #[cfg(with_metrics)]
1020 let round = proposal.content.round;
1021
1022 let chain_id = proposal.content.block.chain_id;
1023 let now = self.storage.clock().current_time();
1025 let block_timestamp = proposal.content.block.timestamp;
1026 let delta = block_timestamp.delta_since(now);
1027 let grace_period = TimeDelta::from_micros(
1028 u64::try_from(self.chain_worker_config.block_time_grace_period.as_micros())
1029 .unwrap_or(u64::MAX),
1030 );
1031 if delta > TimeDelta::ZERO && delta <= grace_period {
1032 self.storage.clock().sleep_until(block_timestamp).await;
1033 }
1034
1035 let response = self
1036 .chain_write(chain_id, |mut guard| async move {
1037 guard.handle_block_proposal(proposal).await
1038 })
1039 .await?;
1040 #[cfg(with_metrics)]
1041 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1042 .with_label_values(&[round.type_name()])
1043 .observe(round.number() as f64);
1044 Ok(response)
1045 }
1046
1047 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
1050 pub async fn handle_lite_certificate(
1051 &self,
1052 certificate: LiteCertificate<'_>,
1053 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1054 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1055 match self.full_certificate(certificate).await? {
1056 Either::Left(confirmed) => {
1057 Box::pin(
1058 self.handle_confirmed_certificate(
1059 confirmed,
1060 notify_when_messages_are_delivered,
1061 ),
1062 )
1063 .await
1064 }
1065 Either::Right(validated) => {
1066 if let Some(notifier) = notify_when_messages_are_delivered {
1067 if let Err(()) = notifier.send(()) {
1069 warn!("Failed to notify message delivery to caller");
1070 }
1071 }
1072 Box::pin(self.handle_validated_certificate(validated)).await
1073 }
1074 }
1075 }
1076
1077 #[instrument(skip_all, fields(
1079 nick = self.nickname(),
1080 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1081 height = %certificate.block().header.height,
1082 ))]
1083 pub async fn handle_confirmed_certificate(
1084 &self,
1085 certificate: ConfirmedBlockCertificate,
1086 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1087 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1088 trace!("{} <-- {:?}", self.nickname(), certificate);
1089 #[cfg(with_metrics)]
1090 let metrics_data = metrics::MetricsData::new(&certificate);
1091
1092 let (info, actions, _outcome) =
1093 Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1094 .await?;
1095
1096 #[cfg(with_metrics)]
1097 if matches!(_outcome, BlockOutcome::Processed) {
1098 metrics_data.record();
1099 }
1100 Ok((info, actions))
1101 }
1102
1103 #[instrument(skip_all, fields(
1105 nick = self.nickname(),
1106 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1107 height = %certificate.block().header.height,
1108 ))]
1109 pub async fn handle_validated_certificate(
1110 &self,
1111 certificate: ValidatedBlockCertificate,
1112 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1113 trace!("{} <-- {:?}", self.nickname(), certificate);
1114
1115 #[cfg(with_metrics)]
1116 let round = certificate.round;
1117 #[cfg(with_metrics)]
1118 let cert_str = certificate.inner().to_log_str();
1119
1120 let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1121 #[cfg(with_metrics)]
1122 {
1123 if matches!(_outcome, BlockOutcome::Processed) {
1124 metrics::NUM_ROUNDS_IN_CERTIFICATE
1125 .with_label_values(&[cert_str, round.type_name()])
1126 .observe(round.number() as f64);
1127 }
1128 }
1129 Ok((info, actions))
1130 }
1131
1132 #[instrument(skip_all, fields(
1134 nick = self.nickname(),
1135 chain_id = format!("{:.8}", certificate.inner().chain_id()),
1136 height = %certificate.inner().height(),
1137 ))]
1138 pub async fn handle_timeout_certificate(
1139 &self,
1140 certificate: TimeoutCertificate,
1141 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1142 trace!("{} <-- {:?}", self.nickname(), certificate);
1143 self.process_timeout(certificate).await
1144 }
1145
1146 #[instrument(skip_all, fields(
1147 nick = self.nickname(),
1148 chain_id = format!("{:.8}", query.chain_id)
1149 ))]
1150 pub async fn handle_chain_info_query(
1151 &self,
1152 query: ChainInfoQuery,
1153 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1154 trace!("{} <-- {:?}", self.nickname(), query);
1155 #[cfg(with_metrics)]
1156 metrics::CHAIN_INFO_QUERIES.inc();
1157 let chain_id = query.chain_id;
1158 let result = self
1159 .chain_write(chain_id, |mut guard| async move {
1160 guard.handle_chain_info_query(query).await
1161 })
1162 .await;
1163 trace!("{} --> {:?}", self.nickname(), result);
1164 result
1165 }
1166
1167 #[instrument(skip_all, fields(
1168 nick = self.nickname(),
1169 chain_id = format!("{:.8}", chain_id)
1170 ))]
1171 pub async fn download_pending_blob(
1172 &self,
1173 chain_id: ChainId,
1174 blob_id: BlobId,
1175 ) -> Result<Blob, WorkerError> {
1176 trace!(
1177 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1178 self.nickname()
1179 );
1180 let result = self
1181 .chain_read(chain_id, |guard| async move {
1182 guard.download_pending_blob(blob_id).await
1183 })
1184 .await;
1185 trace!(
1186 "{} --> {:?}",
1187 self.nickname(),
1188 result.as_ref().map(|_| blob_id)
1189 );
1190 result
1191 }
1192
1193 #[instrument(skip_all, fields(
1194 nick = self.nickname(),
1195 chain_id = format!("{:.8}", chain_id)
1196 ))]
1197 pub async fn handle_pending_blob(
1198 &self,
1199 chain_id: ChainId,
1200 blob: Blob,
1201 ) -> Result<ChainInfoResponse, WorkerError> {
1202 let blob_id = blob.id();
1203 trace!(
1204 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1205 self.nickname()
1206 );
1207 let result = self
1208 .chain_write(chain_id, |mut guard| async move {
1209 guard.handle_pending_blob(blob).await
1210 })
1211 .await;
1212 trace!(
1213 "{} --> {:?}",
1214 self.nickname(),
1215 result.as_ref().map(|_| blob_id)
1216 );
1217 result
1218 }
1219
1220 #[instrument(skip_all, fields(
1221 nick = self.nickname(),
1222 chain_id = format!("{:.8}", request.target_chain_id())
1223 ))]
1224 pub async fn handle_cross_chain_request(
1225 &self,
1226 request: CrossChainRequest,
1227 ) -> Result<NetworkActions, WorkerError> {
1228 trace!("{} <-- {:?}", self.nickname(), request);
1229 match request {
1230 CrossChainRequest::UpdateRecipient {
1231 sender,
1232 recipient,
1233 bundles,
1234 } => {
1235 let mut actions = NetworkActions::default();
1236 let origin = sender;
1237 let Some(height) = self
1238 .process_cross_chain_update(origin, recipient, bundles)
1239 .await?
1240 else {
1241 return Ok(actions);
1242 };
1243 actions.notifications.push(Notification {
1244 chain_id: recipient,
1245 reason: Reason::NewIncomingBundle { origin, height },
1246 });
1247 actions
1248 .cross_chain_requests
1249 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1250 sender,
1251 recipient,
1252 latest_height: height,
1253 });
1254 Ok(actions)
1255 }
1256 CrossChainRequest::ConfirmUpdatedRecipient {
1257 sender,
1258 recipient,
1259 latest_height,
1260 } => {
1261 self.chain_write(sender, |mut guard| async move {
1262 guard
1263 .confirm_updated_recipient(recipient, latest_height)
1264 .await
1265 })
1266 .await?;
1267 Ok(NetworkActions::default())
1268 }
1269 }
1270 }
1271
1272 #[instrument(skip_all, fields(
1274 nickname = %self.nickname(),
1275 chain_id = %chain_id,
1276 num_trackers = %new_trackers.len()
1277 ))]
1278 pub async fn update_received_certificate_trackers(
1279 &self,
1280 chain_id: ChainId,
1281 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1282 ) -> Result<(), WorkerError> {
1283 self.chain_write(chain_id, |mut guard| async move {
1284 guard
1285 .update_received_certificate_trackers(new_trackers)
1286 .await
1287 })
1288 .await
1289 }
1290
1291 #[instrument(skip_all, fields(
1293 nickname = %self.nickname(),
1294 chain_id = %chain_id,
1295 start = %start,
1296 end = %end
1297 ))]
1298 pub async fn get_preprocessed_block_hashes(
1299 &self,
1300 chain_id: ChainId,
1301 start: BlockHeight,
1302 end: BlockHeight,
1303 ) -> Result<Vec<CryptoHash>, WorkerError> {
1304 self.chain_read(chain_id, |guard| async move {
1305 guard.get_preprocessed_block_hashes(start, end).await
1306 })
1307 .await
1308 }
1309
1310 #[instrument(skip_all, fields(
1312 nickname = %self.nickname(),
1313 chain_id = %chain_id,
1314 origin = %origin
1315 ))]
1316 pub async fn get_inbox_next_height(
1317 &self,
1318 chain_id: ChainId,
1319 origin: ChainId,
1320 ) -> Result<BlockHeight, WorkerError> {
1321 self.chain_read(chain_id, |guard| async move {
1322 guard.get_inbox_next_height(origin).await
1323 })
1324 .await
1325 }
1326
1327 #[instrument(skip_all, fields(
1330 nickname = %self.nickname(),
1331 chain_id = %chain_id,
1332 num_blob_ids = %blob_ids.len()
1333 ))]
1334 pub async fn get_locking_blobs(
1335 &self,
1336 chain_id: ChainId,
1337 blob_ids: Vec<BlobId>,
1338 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1339 self.chain_read(chain_id, |guard| async move {
1340 guard.get_locking_blobs(blob_ids).await
1341 })
1342 .await
1343 }
1344
1345 pub async fn get_block_hashes(
1347 &self,
1348 chain_id: ChainId,
1349 heights: Vec<BlockHeight>,
1350 ) -> Result<Vec<CryptoHash>, WorkerError> {
1351 self.chain_read(chain_id, |guard| async move {
1352 guard.get_block_hashes(heights).await
1353 })
1354 .await
1355 }
1356
1357 pub async fn get_proposed_blobs(
1359 &self,
1360 chain_id: ChainId,
1361 blob_ids: Vec<BlobId>,
1362 ) -> Result<Vec<Blob>, WorkerError> {
1363 self.chain_read(chain_id, |guard| async move {
1364 guard.get_proposed_blobs(blob_ids).await
1365 })
1366 .await
1367 }
1368
1369 pub async fn get_event_subscriptions(
1371 &self,
1372 chain_id: ChainId,
1373 ) -> Result<EventSubscriptionsResult, WorkerError> {
1374 self.chain_read(chain_id, |guard| async move {
1375 guard.get_event_subscriptions().await
1376 })
1377 .await
1378 }
1379
1380 pub async fn get_next_expected_event(
1382 &self,
1383 chain_id: ChainId,
1384 stream_id: StreamId,
1385 ) -> Result<Option<u32>, WorkerError> {
1386 self.chain_read(chain_id, |guard| async move {
1387 guard.get_next_expected_event(stream_id).await
1388 })
1389 .await
1390 }
1391
1392 pub async fn get_received_certificate_trackers(
1394 &self,
1395 chain_id: ChainId,
1396 ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1397 self.chain_read(chain_id, |guard| async move {
1398 guard.get_received_certificate_trackers().await
1399 })
1400 .await
1401 }
1402
1403 pub async fn get_tip_state_and_outbox_info(
1405 &self,
1406 chain_id: ChainId,
1407 receiver_id: ChainId,
1408 ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1409 self.chain_read(chain_id, |guard| async move {
1410 guard.get_tip_state_and_outbox_info(receiver_id).await
1411 })
1412 .await
1413 }
1414
1415 pub async fn get_next_height_to_preprocess(
1417 &self,
1418 chain_id: ChainId,
1419 ) -> Result<BlockHeight, WorkerError> {
1420 self.chain_read(chain_id, |guard| async move {
1421 guard.get_next_height_to_preprocess().await
1422 })
1423 .await
1424 }
1425}
1426
1427#[cfg(with_testing)]
1428impl<StorageClient> WorkerState<StorageClient>
1429where
1430 StorageClient: Storage + Clone + 'static,
1431{
1432 #[instrument(level = "trace", skip(self))]
1438 pub fn public_key(&self) -> ValidatorPublicKey {
1439 self.chain_worker_config
1440 .key_pair()
1441 .expect(
1442 "Test validator should have a key pair assigned to it \
1443 in order to obtain its public key",
1444 )
1445 .public()
1446 }
1447}