1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
7 sync::{Arc, Mutex, RwLock},
8 time::Duration,
9};
10
11use futures::future::Either;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey},
14 data_types::{
15 ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, Round, Timestamp,
16 },
17 doc_scalar,
18 hashed::Hashed,
19 identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
20 time::Instant,
21 util::traits::DynError,
22};
23#[cfg(with_testing)]
24use linera_chain::ChainExecutionContext;
25use linera_chain::{
26 data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
27 types::{
28 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
29 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
30 },
31 ChainError, ChainStateView,
32};
33use linera_execution::{ExecutionError, ExecutionStateView, Query, QueryOutcome, ResourceTracker};
34use linera_storage::Storage;
35use linera_views::{context::InactiveContext, ViewError};
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
39use tracing::{instrument, trace, warn};
40
41pub(crate) use crate::chain_worker::{
43 ChainWorkerRequestReceiver, ChainWorkerRequestSender, EventSubscriptionsResult,
44};
45use crate::{
46 chain_worker::{
47 BlockOutcome, ChainWorkerActor, ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier,
48 },
49 client::ListeningMode,
50 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
51 join_set_ext::{JoinSet, JoinSetExt},
52 notifier::Notifier,
53 value_cache::ValueCache,
54 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
55};
56
57#[cfg(test)]
58#[path = "unit_tests/worker_tests.rs"]
59mod worker_tests;
60
61#[cfg(with_metrics)]
62mod metrics {
63 use std::sync::LazyLock;
64
65 use linera_base::prometheus_util::{
66 exponential_bucket_interval, register_histogram, register_histogram_vec,
67 register_int_counter, register_int_counter_vec, register_int_gauge,
68 };
69 use linera_chain::types::ConfirmedBlockCertificate;
70 use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge};
71
72 pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
73 register_histogram_vec(
74 "num_rounds_in_certificate",
75 "Number of rounds in certificate",
76 &["certificate_value", "round_type"],
77 exponential_bucket_interval(0.1, 50.0),
78 )
79 });
80
81 pub static NUM_ROUNDS_IN_BLOCK_PROPOSAL: LazyLock<HistogramVec> = LazyLock::new(|| {
82 register_histogram_vec(
83 "num_rounds_in_block_proposal",
84 "Number of rounds in block proposal",
85 &["round_type"],
86 exponential_bucket_interval(0.1, 50.0),
87 )
88 });
89
90 pub static TRANSACTION_COUNT: LazyLock<IntCounterVec> =
91 LazyLock::new(|| register_int_counter_vec("transaction_count", "Transaction count", &[]));
92
93 pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
94 LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));
95
96 pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
97 LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));
98
99 pub static OPERATION_COUNT: LazyLock<IntCounter> =
100 LazyLock::new(|| register_int_counter("operation_count", "Operation count"));
101
102 pub static OPERATIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
103 register_histogram(
104 "operations_per_block",
105 "Number of operations per block",
106 exponential_bucket_interval(1.0, 10000.0),
107 )
108 });
109
110 pub static INCOMING_BUNDLES_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
111 register_histogram(
112 "incoming_bundles_per_block",
113 "Number of incoming bundles per block",
114 exponential_bucket_interval(1.0, 10000.0),
115 )
116 });
117
118 pub static TRANSACTIONS_PER_BLOCK: LazyLock<Histogram> = LazyLock::new(|| {
119 register_histogram(
120 "transactions_per_block",
121 "Number of transactions per block",
122 exponential_bucket_interval(1.0, 10000.0),
123 )
124 });
125
126 pub static NUM_BLOCKS: LazyLock<IntCounterVec> = LazyLock::new(|| {
127 register_int_counter_vec("num_blocks", "Number of blocks added to chains", &[])
128 });
129
130 pub static CERTIFICATES_SIGNED: LazyLock<IntCounterVec> = LazyLock::new(|| {
131 register_int_counter_vec(
132 "certificates_signed",
133 "Number of confirmed block certificates signed by each validator",
134 &["validator_name"],
135 )
136 });
137
138 pub static CHAIN_INFO_QUERIES: LazyLock<IntCounter> = LazyLock::new(|| {
139 register_int_counter(
140 "chain_info_queries",
141 "Number of chain info queries processed",
142 )
143 });
144
145 pub static CHAIN_WORKER_ENDPOINTS_CACHED: LazyLock<IntGauge> = LazyLock::new(|| {
147 register_int_gauge(
148 "chain_worker_endpoints_cached",
149 "Number of cached chain worker channel endpoints",
150 )
151 });
152
153 pub struct MetricsData {
155 certificate_log_str: &'static str,
156 round_type: &'static str,
157 round_number: u32,
158 confirmed_transactions: u64,
159 confirmed_incoming_bundles: u64,
160 confirmed_incoming_messages: u64,
161 confirmed_operations: u64,
162 validators_with_signatures: Vec<String>,
163 }
164
165 impl MetricsData {
166 pub fn new(certificate: &ConfirmedBlockCertificate) -> Self {
168 Self {
169 certificate_log_str: certificate.inner().to_log_str(),
170 round_type: certificate.round.type_name(),
171 round_number: certificate.round.number(),
172 confirmed_transactions: certificate.block().body.transactions.len() as u64,
173 confirmed_incoming_bundles: certificate.block().body.incoming_bundles().count()
174 as u64,
175 confirmed_incoming_messages: certificate
176 .block()
177 .body
178 .incoming_bundles()
179 .map(|b| b.messages().count())
180 .sum::<usize>() as u64,
181 confirmed_operations: certificate.block().body.operations().count() as u64,
182 validators_with_signatures: certificate
183 .signatures()
184 .iter()
185 .map(|(validator_name, _)| validator_name.to_string())
186 .collect(),
187 }
188 }
189
190 pub fn record(self) {
192 NUM_BLOCKS.with_label_values(&[]).inc();
193 NUM_ROUNDS_IN_CERTIFICATE
194 .with_label_values(&[self.certificate_log_str, self.round_type])
195 .observe(self.round_number as f64);
196 TRANSACTIONS_PER_BLOCK.observe(self.confirmed_transactions as f64);
197 INCOMING_BUNDLES_PER_BLOCK.observe(self.confirmed_incoming_bundles as f64);
198 OPERATIONS_PER_BLOCK.observe(self.confirmed_operations as f64);
199 if self.confirmed_transactions > 0 {
200 TRANSACTION_COUNT
201 .with_label_values(&[])
202 .inc_by(self.confirmed_transactions);
203 if self.confirmed_incoming_bundles > 0 {
204 INCOMING_BUNDLE_COUNT.inc_by(self.confirmed_incoming_bundles);
205 }
206 if self.confirmed_incoming_messages > 0 {
207 INCOMING_MESSAGE_COUNT.inc_by(self.confirmed_incoming_messages);
208 }
209 if self.confirmed_operations > 0 {
210 OPERATION_COUNT.inc_by(self.confirmed_operations);
211 }
212 }
213
214 for validator_name in self.validators_with_signatures {
215 CERTIFICATES_SIGNED
216 .with_label_values(&[&validator_name])
217 .inc();
218 }
219 }
220 }
221}
222
223#[derive(Default, Debug)]
225pub struct NetworkActions {
226 pub cross_chain_requests: Vec<CrossChainRequest>,
228 pub notifications: Vec<Notification>,
230}
231
232impl NetworkActions {
233 pub fn extend(&mut self, other: NetworkActions) {
234 self.cross_chain_requests.extend(other.cross_chain_requests);
235 self.notifications.extend(other.notifications);
236 }
237}
238
239#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
240pub struct Notification {
242 pub chain_id: ChainId,
243 pub reason: Reason,
244}
245
246doc_scalar!(
247 Notification,
248 "Notify that a chain has a new certified block or a new message"
249);
250
251#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
252pub enum Reason {
254 NewBlock {
255 height: BlockHeight,
256 hash: CryptoHash,
257 },
258 NewEvents {
259 height: BlockHeight,
260 hash: CryptoHash,
261 event_streams: BTreeSet<StreamId>,
262 },
263 NewIncomingBundle {
264 origin: ChainId,
265 height: BlockHeight,
266 },
267 NewRound {
268 height: BlockHeight,
269 round: Round,
270 },
271 BlockExecuted {
272 height: BlockHeight,
273 hash: CryptoHash,
274 },
275}
276
277#[derive(Debug, Error)]
279pub enum WorkerError {
280 #[error(transparent)]
281 CryptoError(#[from] CryptoError),
282
283 #[error(transparent)]
284 ArithmeticError(#[from] ArithmeticError),
285
286 #[error(transparent)]
287 ViewError(#[from] ViewError),
288
289 #[error("Certificates are in confirmed_log but not in storage: {0:?}")]
290 ReadCertificatesError(Vec<CryptoHash>),
291
292 #[error(transparent)]
293 ChainError(#[from] Box<ChainError>),
294
295 #[error(transparent)]
296 BcsError(#[from] bcs::Error),
297
298 #[error("Block was not signed by an authorized owner")]
300 InvalidOwner,
301
302 #[error("Operations in the block are not authenticated by the proper owner: {0}")]
303 InvalidSigner(AccountOwner),
304
305 #[error(
307 "Chain is expecting a next block at height {expected_block_height} but the given block \
308 is at height {found_block_height} instead"
309 )]
310 UnexpectedBlockHeight {
311 expected_block_height: BlockHeight,
312 found_block_height: BlockHeight,
313 },
314 #[error("Unexpected epoch {epoch}: chain {chain_id} is at {chain_epoch}")]
315 InvalidEpoch {
316 chain_id: ChainId,
317 chain_epoch: Epoch,
318 epoch: Epoch,
319 },
320
321 #[error("Events not found: {0:?}")]
322 EventsNotFound(Vec<EventId>),
323
324 #[error("Invalid cross-chain request")]
326 InvalidCrossChainRequest,
327 #[error("The block does not contain the hash that we expected for the previous block")]
328 InvalidBlockChaining,
329 #[error(
330 "The given outcome is not what we computed after executing the block.\n\
331 Computed: {computed:#?}\n\
332 Submitted: {submitted:#?}"
333 )]
334 IncorrectOutcome {
335 computed: Box<BlockExecutionOutcome>,
336 submitted: Box<BlockExecutionOutcome>,
337 },
338 #[error(
339 "Block timestamp ({block_timestamp}) is further in the future from local time \
340 ({local_time}) than block time grace period ({block_time_grace_period:?})"
341 )]
342 InvalidTimestamp {
343 block_timestamp: Timestamp,
344 local_time: Timestamp,
345 block_time_grace_period: Duration,
346 },
347 #[error("We don't have the value for the certificate.")]
348 MissingCertificateValue,
349 #[error("The hash certificate doesn't match its value.")]
350 InvalidLiteCertificate,
351 #[error("Fast blocks cannot query oracles")]
352 FastBlockUsingOracles,
353 #[error("Blobs not found: {0:?}")]
354 BlobsNotFound(Vec<BlobId>),
355 #[error("confirmed_log entry at height {height} for chain {chain_id:8} not found")]
356 ConfirmedLogEntryNotFound {
357 height: BlockHeight,
358 chain_id: ChainId,
359 },
360 #[error("preprocessed_blocks entry at height {height} for chain {chain_id:8} not found")]
361 PreprocessedBlocksEntryNotFound {
362 height: BlockHeight,
363 chain_id: ChainId,
364 },
365 #[error("The block proposal is invalid: {0}")]
366 InvalidBlockProposal(String),
367 #[error("Blob was not required by any pending block")]
368 UnexpectedBlob,
369 #[error("Number of published blobs per block must not exceed {0}")]
370 TooManyPublishedBlobs(u64),
371 #[error("Missing network description")]
372 MissingNetworkDescription,
373 #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
374 ChainActorSendError {
375 chain_id: ChainId,
376 error: Box<dyn DynError>,
377 },
378 #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
379 ChainActorRecvError {
380 chain_id: ChainId,
381 error: Box<dyn DynError>,
382 },
383
384 #[error("thread error: {0}")]
385 Thread(#[from] web_thread_pool::Error),
386}
387
388impl WorkerError {
389 pub fn is_local(&self) -> bool {
393 match self {
394 WorkerError::CryptoError(_)
395 | WorkerError::ArithmeticError(_)
396 | WorkerError::InvalidOwner
397 | WorkerError::InvalidSigner(_)
398 | WorkerError::UnexpectedBlockHeight { .. }
399 | WorkerError::InvalidEpoch { .. }
400 | WorkerError::EventsNotFound(_)
401 | WorkerError::InvalidBlockChaining
402 | WorkerError::IncorrectOutcome { .. }
403 | WorkerError::InvalidTimestamp { .. }
404 | WorkerError::MissingCertificateValue
405 | WorkerError::InvalidLiteCertificate
406 | WorkerError::FastBlockUsingOracles
407 | WorkerError::BlobsNotFound(_)
408 | WorkerError::InvalidBlockProposal(_)
409 | WorkerError::UnexpectedBlob
410 | WorkerError::TooManyPublishedBlobs(_)
411 | WorkerError::ViewError(ViewError::NotFound(_)) => false,
412 WorkerError::BcsError(_)
413 | WorkerError::InvalidCrossChainRequest
414 | WorkerError::ViewError(_)
415 | WorkerError::ConfirmedLogEntryNotFound { .. }
416 | WorkerError::PreprocessedBlocksEntryNotFound { .. }
417 | WorkerError::MissingNetworkDescription
418 | WorkerError::ChainActorSendError { .. }
419 | WorkerError::ChainActorRecvError { .. }
420 | WorkerError::Thread(_)
421 | WorkerError::ReadCertificatesError(_) => true,
422 WorkerError::ChainError(chain_error) => chain_error.is_local(),
423 }
424 }
425}
426
427impl From<ChainError> for WorkerError {
428 #[instrument(level = "trace", skip(chain_error))]
429 fn from(chain_error: ChainError) -> Self {
430 match chain_error {
431 ChainError::ExecutionError(execution_error, context) => match *execution_error {
432 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
433 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
434 _ => Self::ChainError(Box::new(ChainError::ExecutionError(
435 execution_error,
436 context,
437 ))),
438 },
439 error => Self::ChainError(Box::new(error)),
440 }
441 }
442}
443
444#[cfg(with_testing)]
445impl WorkerError {
446 pub fn expect_execution_error(self, expected_context: ChainExecutionContext) -> ExecutionError {
452 let WorkerError::ChainError(chain_error) = self else {
453 panic!("Expected an `ExecutionError`. Got: {self:#?}");
454 };
455
456 let ChainError::ExecutionError(execution_error, context) = *chain_error else {
457 panic!("Expected an `ExecutionError`. Got: {chain_error:#?}");
458 };
459
460 assert_eq!(context, expected_context);
461
462 *execution_error
463 }
464}
465
466pub struct WorkerState<StorageClient>
468where
469 StorageClient: Storage,
470{
471 nickname: String,
473 storage: StorageClient,
475 chain_worker_config: ChainWorkerConfig,
477 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
478 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
479 chain_modes: Option<Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>>,
481 delivery_notifiers: Arc<Mutex<DeliveryNotifiers>>,
484 chain_worker_tasks: Arc<Mutex<JoinSet>>,
486 chain_workers: Arc<Mutex<BTreeMap<ChainId, ChainActorEndpoint<StorageClient>>>>,
488}
489
490impl<StorageClient> Clone for WorkerState<StorageClient>
491where
492 StorageClient: Storage + Clone,
493{
494 fn clone(&self) -> Self {
495 WorkerState {
496 nickname: self.nickname.clone(),
497 storage: self.storage.clone(),
498 chain_worker_config: self.chain_worker_config.clone(),
499 block_cache: self.block_cache.clone(),
500 execution_state_cache: self.execution_state_cache.clone(),
501 chain_modes: self.chain_modes.clone(),
502 delivery_notifiers: self.delivery_notifiers.clone(),
503 chain_worker_tasks: self.chain_worker_tasks.clone(),
504 chain_workers: self.chain_workers.clone(),
505 }
506 }
507}
508
509type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
511 ChainWorkerRequest<<StorageClient as Storage>::Context>,
512 tracing::Span,
513 Instant,
514)>;
515
516pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
517
518impl<StorageClient> WorkerState<StorageClient>
519where
520 StorageClient: Storage,
521{
522 #[instrument(level = "trace", skip(nickname, key_pair, storage))]
523 pub fn new(
524 nickname: String,
525 key_pair: Option<ValidatorSecretKey>,
526 storage: StorageClient,
527 block_cache_size: usize,
528 execution_state_cache_size: usize,
529 ) -> Self {
530 WorkerState {
531 nickname,
532 storage,
533 chain_worker_config: ChainWorkerConfig::default().with_key_pair(key_pair),
534 block_cache: Arc::new(ValueCache::new(block_cache_size)),
535 execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
536 chain_modes: None,
537 delivery_notifiers: Arc::default(),
538 chain_worker_tasks: Arc::default(),
539 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
540 }
541 }
542
543 #[instrument(level = "trace", skip(nickname, storage))]
544 pub fn new_for_client(
545 nickname: String,
546 storage: StorageClient,
547 chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
548 block_cache_size: usize,
549 execution_state_cache_size: usize,
550 ) -> Self {
551 WorkerState {
552 nickname,
553 storage,
554 chain_worker_config: ChainWorkerConfig::default(),
555 block_cache: Arc::new(ValueCache::new(block_cache_size)),
556 execution_state_cache: Arc::new(ValueCache::new(execution_state_cache_size)),
557 chain_modes: Some(chain_modes),
558 delivery_notifiers: Arc::default(),
559 chain_worker_tasks: Arc::default(),
560 chain_workers: Arc::new(Mutex::new(BTreeMap::new())),
561 }
562 }
563
564 #[instrument(level = "trace", skip(self, value))]
565 pub fn with_allow_inactive_chains(mut self, value: bool) -> Self {
566 self.chain_worker_config.allow_inactive_chains = value;
567 self
568 }
569
570 #[instrument(level = "trace", skip(self, value))]
571 pub fn with_allow_messages_from_deprecated_epochs(mut self, value: bool) -> Self {
572 self.chain_worker_config
573 .allow_messages_from_deprecated_epochs = value;
574 self
575 }
576
577 #[instrument(level = "trace", skip(self, value))]
578 pub fn with_long_lived_services(mut self, value: bool) -> Self {
579 self.chain_worker_config.long_lived_services = value;
580 self
581 }
582
583 #[instrument(level = "trace", skip(self))]
588 pub fn with_block_time_grace_period(mut self, block_time_grace_period: Duration) -> Self {
589 self.chain_worker_config.block_time_grace_period = block_time_grace_period;
590 self
591 }
592
593 #[instrument(level = "trace", skip(self))]
597 pub fn with_chain_worker_ttl(mut self, chain_worker_ttl: Duration) -> Self {
598 self.chain_worker_config.ttl = chain_worker_ttl;
599 self
600 }
601
602 #[instrument(level = "trace", skip(self))]
606 pub fn with_sender_chain_worker_ttl(mut self, sender_chain_worker_ttl: Duration) -> Self {
607 self.chain_worker_config.sender_chain_ttl = sender_chain_worker_ttl;
608 self
609 }
610
611 #[instrument(level = "trace", skip(self))]
615 pub fn with_chain_info_max_received_log_entries(
616 mut self,
617 chain_info_max_received_log_entries: usize,
618 ) -> Self {
619 if chain_info_max_received_log_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
620 warn!(
621 "The value set for the maximum size of received_log entries \
622 may not be compatible with the latest clients: {} instead of {}",
623 chain_info_max_received_log_entries, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES
624 );
625 }
626 self.chain_worker_config.chain_info_max_received_log_entries =
627 chain_info_max_received_log_entries;
628 self
629 }
630
631 #[instrument(level = "trace", skip(self))]
632 pub fn nickname(&self) -> &str {
633 &self.nickname
634 }
635
636 #[instrument(level = "trace", skip(self))]
638 #[cfg(not(feature = "test"))]
639 pub(crate) fn storage_client(&self) -> &StorageClient {
640 &self.storage
641 }
642
643 #[instrument(level = "trace", skip(self))]
646 #[cfg(feature = "test")]
647 pub fn storage_client(&self) -> &StorageClient {
648 &self.storage
649 }
650
651 #[instrument(level = "trace", skip(self, certificate))]
652 pub(crate) async fn full_certificate(
653 &self,
654 certificate: LiteCertificate<'_>,
655 ) -> Result<Either<ConfirmedBlockCertificate, ValidatedBlockCertificate>, WorkerError> {
656 let block = self
657 .block_cache
658 .get(&certificate.value.value_hash)
659 .ok_or(WorkerError::MissingCertificateValue)?;
660
661 match certificate.value.kind {
662 linera_chain::types::CertificateKind::Confirmed => {
663 let value = ConfirmedBlock::from_hashed(block);
664 Ok(Either::Left(
665 certificate
666 .with_value(value)
667 .ok_or(WorkerError::InvalidLiteCertificate)?,
668 ))
669 }
670 linera_chain::types::CertificateKind::Validated => {
671 let value = ValidatedBlock::from_hashed(block);
672 Ok(Either::Right(
673 certificate
674 .with_value(value)
675 .ok_or(WorkerError::InvalidLiteCertificate)?,
676 ))
677 }
678 _ => Err(WorkerError::InvalidLiteCertificate),
679 }
680 }
681}
682
683#[allow(async_fn_in_trait)]
684#[cfg_attr(not(web), trait_variant::make(Send))]
685pub trait ProcessableCertificate: CertificateValue + Sized + 'static {
686 async fn process_certificate<S: Storage + Clone + 'static>(
687 worker: &WorkerState<S>,
688 certificate: GenericCertificate<Self>,
689 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError>;
690}
691
692impl ProcessableCertificate for ConfirmedBlock {
693 async fn process_certificate<S: Storage + Clone + 'static>(
694 worker: &WorkerState<S>,
695 certificate: ConfirmedBlockCertificate,
696 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
697 Box::pin(worker.handle_confirmed_certificate(certificate, None)).await
698 }
699}
700
701impl ProcessableCertificate for ValidatedBlock {
702 async fn process_certificate<S: Storage + Clone + 'static>(
703 worker: &WorkerState<S>,
704 certificate: ValidatedBlockCertificate,
705 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
706 Box::pin(worker.handle_validated_certificate(certificate)).await
707 }
708}
709
710impl ProcessableCertificate for Timeout {
711 async fn process_certificate<S: Storage + Clone + 'static>(
712 worker: &WorkerState<S>,
713 certificate: TimeoutCertificate,
714 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
715 worker.handle_timeout_certificate(certificate).await
716 }
717}
718
719impl<StorageClient> WorkerState<StorageClient>
720where
721 StorageClient: Storage + Clone + 'static,
722{
723 #[instrument(level = "trace", skip(self, certificate, notifier))]
724 #[inline]
725 pub async fn fully_handle_certificate_with_notifications<T>(
726 &self,
727 certificate: GenericCertificate<T>,
728 notifier: &impl Notifier,
729 ) -> Result<ChainInfoResponse, WorkerError>
730 where
731 T: ProcessableCertificate,
732 {
733 let notifications = (*notifier).clone();
734 let this = self.clone();
735 linera_base::task::spawn(async move {
736 let (response, actions) =
737 ProcessableCertificate::process_certificate(&this, certificate).await?;
738 notifications.notify(&actions.notifications);
739 let mut requests = VecDeque::from(actions.cross_chain_requests);
740 while let Some(request) = requests.pop_front() {
741 let actions = this.handle_cross_chain_request(request).await?;
742 requests.extend(actions.cross_chain_requests);
743 notifications.notify(&actions.notifications);
744 }
745 Ok(response)
746 })
747 .await
748 }
749
750 #[instrument(level = "trace", skip(self, block))]
752 pub async fn stage_block_execution(
753 &self,
754 block: ProposedBlock,
755 round: Option<u32>,
756 published_blobs: Vec<Blob>,
757 ) -> Result<(Block, ChainInfoResponse, ResourceTracker), WorkerError> {
758 self.query_chain_worker(block.chain_id, move |callback| {
759 ChainWorkerRequest::StageBlockExecution {
760 block,
761 round,
762 published_blobs,
763 callback,
764 }
765 })
766 .await
767 }
768
769 #[instrument(level = "trace", skip(self, chain_id, query))]
774 pub async fn query_application(
775 &self,
776 chain_id: ChainId,
777 query: Query,
778 block_hash: Option<CryptoHash>,
779 ) -> Result<QueryOutcome, WorkerError> {
780 self.query_chain_worker(chain_id, move |callback| {
781 ChainWorkerRequest::QueryApplication {
782 query,
783 block_hash,
784 callback,
785 }
786 })
787 .await
788 }
789
790 #[instrument(level = "trace", skip(self, chain_id, application_id), fields(
791 nickname = %self.nickname,
792 chain_id = %chain_id,
793 application_id = %application_id
794 ))]
795 pub async fn describe_application(
796 &self,
797 chain_id: ChainId,
798 application_id: ApplicationId,
799 ) -> Result<ApplicationDescription, WorkerError> {
800 self.query_chain_worker(chain_id, move |callback| {
801 ChainWorkerRequest::DescribeApplication {
802 application_id,
803 callback,
804 }
805 })
806 .await
807 }
808
809 #[instrument(
811 level = "trace",
812 skip(self, certificate, notify_when_messages_are_delivered),
813 fields(
814 nickname = %self.nickname,
815 chain_id = %certificate.block().header.chain_id,
816 block_height = %certificate.block().header.height
817 )
818 )]
819 async fn process_confirmed_block(
820 &self,
821 certificate: ConfirmedBlockCertificate,
822 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
823 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
824 let chain_id = certificate.block().header.chain_id;
825 self.query_chain_worker(chain_id, move |callback| {
826 ChainWorkerRequest::ProcessConfirmedBlock {
827 certificate,
828 notify_when_messages_are_delivered,
829 callback,
830 }
831 })
832 .await
833 }
834
835 #[instrument(level = "trace", skip(self, certificate), fields(
837 nickname = %self.nickname,
838 chain_id = %certificate.block().header.chain_id,
839 block_height = %certificate.block().header.height
840 ))]
841 async fn process_validated_block(
842 &self,
843 certificate: ValidatedBlockCertificate,
844 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
845 let chain_id = certificate.block().header.chain_id;
846 self.query_chain_worker(chain_id, move |callback| {
847 ChainWorkerRequest::ProcessValidatedBlock {
848 certificate,
849 callback,
850 }
851 })
852 .await
853 }
854
855 #[instrument(level = "trace", skip(self, certificate), fields(
857 nickname = %self.nickname,
858 chain_id = %certificate.value().chain_id(),
859 height = %certificate.value().height()
860 ))]
861 async fn process_timeout(
862 &self,
863 certificate: TimeoutCertificate,
864 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
865 let chain_id = certificate.value().chain_id();
866 self.query_chain_worker(chain_id, move |callback| {
867 ChainWorkerRequest::ProcessTimeout {
868 certificate,
869 callback,
870 }
871 })
872 .await
873 }
874
875 #[instrument(level = "trace", skip(self, origin, recipient, bundles), fields(
876 nickname = %self.nickname,
877 origin = %origin,
878 recipient = %recipient,
879 num_bundles = %bundles.len()
880 ))]
881 async fn process_cross_chain_update(
882 &self,
883 origin: ChainId,
884 recipient: ChainId,
885 bundles: Vec<(Epoch, MessageBundle)>,
886 ) -> Result<Option<BlockHeight>, WorkerError> {
887 self.query_chain_worker(recipient, move |callback| {
888 ChainWorkerRequest::ProcessCrossChainUpdate {
889 origin,
890 bundles,
891 callback,
892 }
893 })
894 .await
895 }
896
897 #[instrument(level = "trace", skip(self, chain_id, height), fields(
899 nickname = %self.nickname,
900 chain_id = %chain_id,
901 height = %height
902 ))]
903 #[cfg(with_testing)]
904 pub async fn read_certificate(
905 &self,
906 chain_id: ChainId,
907 height: BlockHeight,
908 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
909 self.query_chain_worker(chain_id, move |callback| {
910 ChainWorkerRequest::ReadCertificate { height, callback }
911 })
912 .await
913 }
914
915 #[instrument(level = "trace", skip(self), fields(
921 nickname = %self.nickname,
922 chain_id = %chain_id
923 ))]
924 pub async fn chain_state_view(
925 &self,
926 chain_id: ChainId,
927 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
928 self.query_chain_worker(chain_id, |callback| ChainWorkerRequest::GetChainStateView {
929 callback,
930 })
931 .await
932 }
933
934 #[instrument(level = "trace", skip(self, request_builder), fields(
936 nickname = %self.nickname,
937 chain_id = %chain_id
938 ))]
939 async fn query_chain_worker<Response>(
940 &self,
941 chain_id: ChainId,
942 request_builder: impl FnOnce(
943 oneshot::Sender<Result<Response, WorkerError>>,
944 ) -> ChainWorkerRequest<StorageClient::Context>,
945 ) -> Result<Response, WorkerError> {
946 let (callback, response) = oneshot::channel();
948 let request = request_builder(callback);
949
950 let new_channel = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;
952
953 if let Some((sender, receiver)) = new_channel {
955 let delivery_notifier = self
956 .delivery_notifiers
957 .lock()
958 .unwrap()
959 .entry(chain_id)
960 .or_default()
961 .clone();
962
963 let is_tracked = self.chain_modes.as_ref().is_some_and(|chain_modes| {
964 chain_modes
965 .read()
966 .unwrap()
967 .get(&chain_id)
968 .is_some_and(ListeningMode::is_full)
969 });
970
971 let actor_task = ChainWorkerActor::run(
972 self.chain_worker_config.clone(),
973 self.storage.clone(),
974 self.block_cache.clone(),
975 self.execution_state_cache.clone(),
976 self.chain_modes.clone(),
977 delivery_notifier,
978 chain_id,
979 sender,
980 receiver,
981 is_tracked,
982 );
983
984 self.chain_worker_tasks
985 .lock()
986 .unwrap()
987 .spawn_task(actor_task);
988 }
989
990 match response.await {
992 Err(e) => {
993 Err(WorkerError::ChainActorRecvError {
995 chain_id,
996 error: Box::new(e),
997 })
998 }
999 Ok(response) => response,
1000 }
1001 }
1002
1003 #[instrument(level = "trace", skip(self), fields(
1008 nickname = %self.nickname,
1009 chain_id = %chain_id
1010 ))]
1011 #[expect(clippy::type_complexity)]
1012 fn call_and_maybe_create_chain_worker_endpoint(
1013 &self,
1014 chain_id: ChainId,
1015 request: ChainWorkerRequest<StorageClient::Context>,
1016 ) -> Result<
1017 Option<(
1018 ChainWorkerRequestSender<StorageClient::Context>,
1019 ChainWorkerRequestReceiver<StorageClient::Context>,
1020 )>,
1021 WorkerError,
1022 > {
1023 let mut chain_workers = self.chain_workers.lock().unwrap();
1024
1025 let (sender, new_channel) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
1026 (endpoint, None)
1027 } else {
1028 let (sender, receiver) = mpsc::unbounded_channel();
1029 (sender.clone(), Some((sender, receiver)))
1030 };
1031
1032 if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) {
1033 return Err(WorkerError::ChainActorSendError {
1035 chain_id,
1036 error: Box::new(e),
1037 });
1038 }
1039
1040 chain_workers.insert(chain_id, sender);
1042 #[cfg(with_metrics)]
1043 metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64);
1044
1045 Ok(new_channel)
1046 }
1047
1048 #[instrument(skip_all, fields(
1049 nick = self.nickname,
1050 chain_id = format!("{:.8}", proposal.content.block.chain_id),
1051 height = %proposal.content.block.height,
1052 ))]
1053 pub async fn handle_block_proposal(
1054 &self,
1055 proposal: BlockProposal,
1056 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1057 trace!("{} <-- {:?}", self.nickname, proposal);
1058 #[cfg(with_metrics)]
1059 let round = proposal.content.round;
1060 let response = self
1061 .query_chain_worker(proposal.content.block.chain_id, move |callback| {
1062 ChainWorkerRequest::HandleBlockProposal { proposal, callback }
1063 })
1064 .await?;
1065 #[cfg(with_metrics)]
1066 metrics::NUM_ROUNDS_IN_BLOCK_PROPOSAL
1067 .with_label_values(&[round.type_name()])
1068 .observe(round.number() as f64);
1069 Ok(response)
1070 }
1071
1072 #[instrument(skip_all, fields(hash = %certificate.value.value_hash))]
1075 pub async fn handle_lite_certificate(
1076 &self,
1077 certificate: LiteCertificate<'_>,
1078 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1079 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1080 match self.full_certificate(certificate).await? {
1081 Either::Left(confirmed) => {
1082 Box::pin(
1083 self.handle_confirmed_certificate(
1084 confirmed,
1085 notify_when_messages_are_delivered,
1086 ),
1087 )
1088 .await
1089 }
1090 Either::Right(validated) => {
1091 if let Some(notifier) = notify_when_messages_are_delivered {
1092 if let Err(()) = notifier.send(()) {
1094 warn!("Failed to notify message delivery to caller");
1095 }
1096 }
1097 Box::pin(self.handle_validated_certificate(validated)).await
1098 }
1099 }
1100 }
1101
1102 #[instrument(skip_all, fields(
1104 nick = self.nickname,
1105 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1106 height = %certificate.block().header.height,
1107 ))]
1108 pub async fn handle_confirmed_certificate(
1109 &self,
1110 certificate: ConfirmedBlockCertificate,
1111 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1112 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1113 trace!("{} <-- {:?}", self.nickname, certificate);
1114 #[cfg(with_metrics)]
1115 let metrics_data = metrics::MetricsData::new(&certificate);
1116
1117 let (info, actions, _outcome) =
1118 Box::pin(self.process_confirmed_block(certificate, notify_when_messages_are_delivered))
1119 .await?;
1120
1121 #[cfg(with_metrics)]
1122 if matches!(_outcome, BlockOutcome::Processed) {
1123 metrics_data.record();
1124 }
1125 Ok((info, actions))
1126 }
1127
1128 #[instrument(skip_all, fields(
1130 nick = self.nickname,
1131 chain_id = format!("{:.8}", certificate.block().header.chain_id),
1132 height = %certificate.block().header.height,
1133 ))]
1134 pub async fn handle_validated_certificate(
1135 &self,
1136 certificate: ValidatedBlockCertificate,
1137 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1138 trace!("{} <-- {:?}", self.nickname, certificate);
1139
1140 #[cfg(with_metrics)]
1141 let round = certificate.round;
1142 #[cfg(with_metrics)]
1143 let cert_str = certificate.inner().to_log_str();
1144
1145 let (info, actions, _outcome) = Box::pin(self.process_validated_block(certificate)).await?;
1146 #[cfg(with_metrics)]
1147 {
1148 if matches!(_outcome, BlockOutcome::Processed) {
1149 metrics::NUM_ROUNDS_IN_CERTIFICATE
1150 .with_label_values(&[cert_str, round.type_name()])
1151 .observe(round.number() as f64);
1152 }
1153 }
1154 Ok((info, actions))
1155 }
1156
1157 #[instrument(skip_all, fields(
1159 nick = self.nickname,
1160 chain_id = format!("{:.8}", certificate.inner().chain_id()),
1161 height = %certificate.inner().height(),
1162 ))]
1163 pub async fn handle_timeout_certificate(
1164 &self,
1165 certificate: TimeoutCertificate,
1166 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1167 trace!("{} <-- {:?}", self.nickname, certificate);
1168 self.process_timeout(certificate).await
1169 }
1170
1171 #[instrument(skip_all, fields(
1172 nick = self.nickname,
1173 chain_id = format!("{:.8}", query.chain_id)
1174 ))]
1175 pub async fn handle_chain_info_query(
1176 &self,
1177 query: ChainInfoQuery,
1178 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
1179 trace!("{} <-- {:?}", self.nickname, query);
1180 #[cfg(with_metrics)]
1181 metrics::CHAIN_INFO_QUERIES.inc();
1182 let result = self
1183 .query_chain_worker(query.chain_id, move |callback| {
1184 ChainWorkerRequest::HandleChainInfoQuery { query, callback }
1185 })
1186 .await;
1187 trace!("{} --> {:?}", self.nickname, result);
1188 result
1189 }
1190
1191 #[instrument(skip_all, fields(
1192 nick = self.nickname,
1193 chain_id = format!("{:.8}", chain_id)
1194 ))]
1195 pub async fn download_pending_blob(
1196 &self,
1197 chain_id: ChainId,
1198 blob_id: BlobId,
1199 ) -> Result<Blob, WorkerError> {
1200 trace!(
1201 "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
1202 self.nickname
1203 );
1204 let result = self
1205 .query_chain_worker(chain_id, move |callback| {
1206 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
1207 })
1208 .await;
1209 trace!(
1210 "{} --> {:?}",
1211 self.nickname,
1212 result.as_ref().map(|_| blob_id)
1213 );
1214 result
1215 }
1216
1217 #[instrument(skip_all, fields(
1218 nick = self.nickname,
1219 chain_id = format!("{:.8}", chain_id)
1220 ))]
1221 pub async fn handle_pending_blob(
1222 &self,
1223 chain_id: ChainId,
1224 blob: Blob,
1225 ) -> Result<ChainInfoResponse, WorkerError> {
1226 let blob_id = blob.id();
1227 trace!(
1228 "{} <-- handle_pending_blob({chain_id:8}, {blob_id:8})",
1229 self.nickname
1230 );
1231 let result = self
1232 .query_chain_worker(chain_id, move |callback| {
1233 ChainWorkerRequest::HandlePendingBlob { blob, callback }
1234 })
1235 .await;
1236 trace!(
1237 "{} --> {:?}",
1238 self.nickname,
1239 result.as_ref().map(|_| blob_id)
1240 );
1241 result
1242 }
1243
1244 #[instrument(skip_all, fields(
1245 nick = self.nickname,
1246 chain_id = format!("{:.8}", request.target_chain_id())
1247 ))]
1248 pub async fn handle_cross_chain_request(
1249 &self,
1250 request: CrossChainRequest,
1251 ) -> Result<NetworkActions, WorkerError> {
1252 trace!("{} <-- {:?}", self.nickname, request);
1253 match request {
1254 CrossChainRequest::UpdateRecipient {
1255 sender,
1256 recipient,
1257 bundles,
1258 } => {
1259 let mut actions = NetworkActions::default();
1260 let origin = sender;
1261 let Some(height) = self
1262 .process_cross_chain_update(origin, recipient, bundles)
1263 .await?
1264 else {
1265 return Ok(actions);
1266 };
1267 actions.notifications.push(Notification {
1268 chain_id: recipient,
1269 reason: Reason::NewIncomingBundle { origin, height },
1270 });
1271 actions
1272 .cross_chain_requests
1273 .push(CrossChainRequest::ConfirmUpdatedRecipient {
1274 sender,
1275 recipient,
1276 latest_height: height,
1277 });
1278 Ok(actions)
1279 }
1280 CrossChainRequest::ConfirmUpdatedRecipient {
1281 sender,
1282 recipient,
1283 latest_height,
1284 } => {
1285 self.query_chain_worker(sender, move |callback| {
1286 ChainWorkerRequest::ConfirmUpdatedRecipient {
1287 recipient,
1288 latest_height,
1289 callback,
1290 }
1291 })
1292 .await?;
1293 Ok(NetworkActions::default())
1294 }
1295 }
1296 }
1297
1298 #[instrument(skip_all, fields(
1300 nickname = %self.nickname,
1301 chain_id = %chain_id,
1302 num_trackers = %new_trackers.len()
1303 ))]
1304 pub async fn update_received_certificate_trackers(
1305 &self,
1306 chain_id: ChainId,
1307 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1308 ) -> Result<(), WorkerError> {
1309 self.query_chain_worker(chain_id, move |callback| {
1310 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
1311 new_trackers,
1312 callback,
1313 }
1314 })
1315 .await
1316 }
1317
1318 #[instrument(skip_all, fields(
1320 nickname = %self.nickname,
1321 chain_id = %chain_id,
1322 start = %start,
1323 end = %end
1324 ))]
1325 pub async fn get_preprocessed_block_hashes(
1326 &self,
1327 chain_id: ChainId,
1328 start: BlockHeight,
1329 end: BlockHeight,
1330 ) -> Result<Vec<CryptoHash>, WorkerError> {
1331 self.query_chain_worker(chain_id, move |callback| {
1332 ChainWorkerRequest::GetPreprocessedBlockHashes {
1333 start,
1334 end,
1335 callback,
1336 }
1337 })
1338 .await
1339 }
1340
1341 #[instrument(skip_all, fields(
1343 nickname = %self.nickname,
1344 chain_id = %chain_id,
1345 origin = %origin
1346 ))]
1347 pub async fn get_inbox_next_height(
1348 &self,
1349 chain_id: ChainId,
1350 origin: ChainId,
1351 ) -> Result<BlockHeight, WorkerError> {
1352 self.query_chain_worker(chain_id, move |callback| {
1353 ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1354 })
1355 .await
1356 }
1357
1358 #[instrument(skip_all, fields(
1361 nickname = %self.nickname,
1362 chain_id = %chain_id,
1363 num_blob_ids = %blob_ids.len()
1364 ))]
1365 pub async fn get_locking_blobs(
1366 &self,
1367 chain_id: ChainId,
1368 blob_ids: Vec<BlobId>,
1369 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1370 self.query_chain_worker(chain_id, move |callback| {
1371 ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1372 })
1373 .await
1374 }
1375
1376 pub async fn get_block_hashes(
1378 &self,
1379 chain_id: ChainId,
1380 heights: Vec<BlockHeight>,
1381 ) -> Result<Vec<CryptoHash>, WorkerError> {
1382 self.query_chain_worker(chain_id, move |callback| {
1383 ChainWorkerRequest::GetBlockHashes { heights, callback }
1384 })
1385 .await
1386 }
1387
1388 pub async fn get_proposed_blobs(
1390 &self,
1391 chain_id: ChainId,
1392 blob_ids: Vec<BlobId>,
1393 ) -> Result<Vec<Blob>, WorkerError> {
1394 self.query_chain_worker(chain_id, move |callback| {
1395 ChainWorkerRequest::GetProposedBlobs { blob_ids, callback }
1396 })
1397 .await
1398 }
1399
1400 pub async fn get_event_subscriptions(
1402 &self,
1403 chain_id: ChainId,
1404 ) -> Result<EventSubscriptionsResult, WorkerError> {
1405 self.query_chain_worker(chain_id, |callback| {
1406 ChainWorkerRequest::GetEventSubscriptions { callback }
1407 })
1408 .await
1409 }
1410
1411 pub async fn get_next_expected_event(
1413 &self,
1414 chain_id: ChainId,
1415 stream_id: StreamId,
1416 ) -> Result<Option<u32>, WorkerError> {
1417 self.query_chain_worker(chain_id, move |callback| {
1418 ChainWorkerRequest::GetNextExpectedEvent {
1419 stream_id,
1420 callback,
1421 }
1422 })
1423 .await
1424 }
1425
1426 pub async fn get_received_certificate_trackers(
1428 &self,
1429 chain_id: ChainId,
1430 ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1431 self.query_chain_worker(chain_id, |callback| {
1432 ChainWorkerRequest::GetReceivedCertificateTrackers { callback }
1433 })
1434 .await
1435 }
1436
1437 pub async fn get_tip_state_and_outbox_info(
1439 &self,
1440 chain_id: ChainId,
1441 receiver_id: ChainId,
1442 ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1443 self.query_chain_worker(chain_id, move |callback| {
1444 ChainWorkerRequest::GetTipStateAndOutboxInfo {
1445 receiver_id,
1446 callback,
1447 }
1448 })
1449 .await
1450 }
1451
1452 pub async fn get_next_height_to_preprocess(
1454 &self,
1455 chain_id: ChainId,
1456 ) -> Result<BlockHeight, WorkerError> {
1457 self.query_chain_worker(chain_id, |callback| {
1458 ChainWorkerRequest::GetNextHeightToPreprocess { callback }
1459 })
1460 .await
1461 }
1462}
1463
1464#[cfg(with_testing)]
1465impl<StorageClient> WorkerState<StorageClient>
1466where
1467 StorageClient: Storage,
1468{
1469 #[instrument(level = "trace", skip(self))]
1475 pub fn public_key(&self) -> ValidatorPublicKey {
1476 self.chain_worker_config
1477 .key_pair()
1478 .expect(
1479 "Test validator should have a key pair assigned to it \
1480 in order to obtain it's public key",
1481 )
1482 .public()
1483 }
1484}