1use std::{
6 cmp::Ordering,
7 collections::{BTreeMap, BTreeSet},
8 sync::{Arc, RwLock},
9};
10
11use custom_debug_derive::Debug;
12use futures::{
13 future::Future,
14 stream::{self, AbortHandle, FuturesUnordered, StreamExt},
15};
16#[cfg(with_metrics)]
17use linera_base::prometheus_util::MeasureLatency as _;
18use linera_base::{
19 crypto::{CryptoHash, Signer as _, ValidatorPublicKey},
20 data_types::{ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch, TimeDelta},
21 ensure,
22 identifiers::{AccountOwner, BlobId, BlobType, ChainId, StreamId},
23 time::Duration,
24};
25#[cfg(not(target_arch = "wasm32"))]
26use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
27use linera_chain::{
28 data_types::{BlockProposal, BundleExecutionPolicy, ChainAndHeight, LiteVote, ProposedBlock},
29 manager::LockingBlock,
30 types::{
31 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
32 LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
33 },
34 ChainError,
35};
36use linera_execution::committee::Committee;
37use linera_storage::{ResultReadCertificates, Storage as _};
38use rand::{
39 distributions::{Distribution, WeightedIndex},
40 seq::SliceRandom,
41};
42use received_log::ReceivedLogs;
43use serde::{Deserialize, Serialize};
44use tokio::sync::mpsc;
45use tracing::{debug, error, info, instrument, trace, warn};
46
47use crate::{
48 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
49 environment::Environment,
50 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
51 node::{CrossChainMessageDelivery, NodeError, ValidatorNodeProvider as _},
52 notifier::{ChannelNotifier, Notifier as _},
53 remote_node::RemoteNode,
54 updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
55 worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
56 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
57};
58
59pub mod chain_client;
60pub use chain_client::ChainClient;
61
62pub use crate::data_types::ClientOutcome;
63
64#[cfg(test)]
65#[path = "../unit_tests/client_tests.rs"]
66mod client_tests;
67pub mod requests_scheduler;
68
69pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
70mod received_log;
71mod validator_trackers;
72
73#[cfg(with_metrics)]
74mod metrics {
75 use std::sync::LazyLock;
76
77 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
78 use prometheus::HistogramVec;
79
80 pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
81 LazyLock::new(|| {
82 register_histogram_vec(
83 "process_inbox_latency",
84 "process_inbox latency",
85 &[],
86 exponential_bucket_latencies(500.0),
87 )
88 });
89
90 pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
91 register_histogram_vec(
92 "prepare_chain_latency",
93 "prepare_chain latency",
94 &[],
95 exponential_bucket_latencies(500.0),
96 )
97 });
98
99 pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
100 register_histogram_vec(
101 "synchronize_chain_state_latency",
102 "synchronize_chain_state latency",
103 &[],
104 exponential_bucket_latencies(500.0),
105 )
106 });
107
108 pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
109 register_histogram_vec(
110 "execute_block_latency",
111 "execute_block latency",
112 &[],
113 exponential_bucket_latencies(500.0),
114 )
115 });
116
117 pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
118 register_histogram_vec(
119 "find_received_certificates_latency",
120 "find_received_certificates latency",
121 &[],
122 exponential_bucket_latencies(500.0),
123 )
124 });
125}
126
127pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
128pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
129
130#[derive(Debug, Clone, Copy)]
131pub enum TimingType {
132 ExecuteOperations,
133 ExecuteBlock,
134 SubmitBlockProposal,
135 UpdateValidators,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
142pub enum ListeningMode {
143 FullChain,
146 FollowChain,
150 EventsOnly(BTreeSet<StreamId>),
152}
153
154impl PartialOrd for ListeningMode {
155 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
156 match (self, other) {
157 (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
158 (ListeningMode::FullChain, _) => Some(Ordering::Greater),
159 (_, ListeningMode::FullChain) => Some(Ordering::Less),
160 (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
161 (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
162 (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
163 (ListeningMode::EventsOnly(events_a), ListeningMode::EventsOnly(events_b)) => {
164 if events_a.is_superset(events_b) {
165 Some(Ordering::Greater)
166 } else if events_b.is_superset(events_a) {
167 Some(Ordering::Less)
168 } else {
169 None
170 }
171 }
172 }
173 }
174}
175
176impl ListeningMode {
177 pub fn is_relevant(&self, reason: &Reason) -> bool {
180 match (reason, self) {
181 (_, ListeningMode::FullChain) => true,
183 (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
186 (Reason::NewEvents { .. }, ListeningMode::FollowChain) => true,
187 (_, ListeningMode::FollowChain) => false,
188 (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
190 relevant.intersection(event_streams).next().is_some()
191 }
192 (_, ListeningMode::EventsOnly(_)) => false,
193 }
194 }
195
196 pub fn extend(&mut self, other: Option<ListeningMode>) {
197 match (self, other) {
198 (_, None) => (),
199 (ListeningMode::FullChain, _) => (),
200 (mode, Some(ListeningMode::FullChain)) => {
201 *mode = ListeningMode::FullChain;
202 }
203 (ListeningMode::FollowChain, _) => (),
204 (mode, Some(ListeningMode::FollowChain)) => {
205 *mode = ListeningMode::FollowChain;
206 }
207 (
208 ListeningMode::EventsOnly(self_events),
209 Some(ListeningMode::EventsOnly(other_events)),
210 ) => {
211 self_events.extend(other_events);
212 }
213 }
214 }
215
216 pub fn is_follow_only(&self) -> bool {
219 !matches!(self, ListeningMode::FullChain)
220 }
221
222 pub fn is_full(&self) -> bool {
225 matches!(self, ListeningMode::FullChain)
226 }
227}
228
229pub struct Client<Env: Environment> {
231 environment: Env,
232 pub local_node: LocalNodeClient<Env::Storage>,
235 requests_scheduler: RequestsScheduler<Env>,
237 admin_chain_id: ChainId,
239 chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
242 notifier: Arc<ChannelNotifier<Notification>>,
244 chains: papaya::HashMap<ChainId, chain_client::State>,
246 options: chain_client::Options,
248}
249
250impl<Env: Environment> Client<Env> {
251 #[instrument(level = "trace", skip_all)]
253 #[allow(clippy::too_many_arguments)]
254 pub fn new(
255 environment: Env,
256 admin_chain_id: ChainId,
257 long_lived_services: bool,
258 chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
259 name: impl Into<String>,
260 chain_worker_ttl: Duration,
261 sender_chain_worker_ttl: Duration,
262 options: chain_client::Options,
263 block_cache_size: usize,
264 execution_state_cache_size: usize,
265 requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
266 ) -> Self {
267 let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect()));
268 let state = WorkerState::new_for_client(
269 name.into(),
270 environment.storage().clone(),
271 chain_modes.clone(),
272 block_cache_size,
273 execution_state_cache_size,
274 )
275 .with_long_lived_services(long_lived_services)
276 .with_allow_inactive_chains(true)
277 .with_allow_messages_from_deprecated_epochs(true)
278 .with_chain_worker_ttl(chain_worker_ttl)
279 .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
280 let local_node = LocalNodeClient::new(state);
281 let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
282
283 Self {
284 environment,
285 local_node,
286 requests_scheduler,
287 chains: papaya::HashMap::new(),
288 admin_chain_id,
289 chain_modes,
290 notifier: Arc::new(ChannelNotifier::default()),
291 options,
292 }
293 }
294
295 pub fn admin_chain_id(&self) -> ChainId {
297 self.admin_chain_id
298 }
299
300 pub fn storage_client(&self) -> &Env::Storage {
302 self.environment.storage()
303 }
304
305 pub fn validator_node_provider(&self) -> &Env::Network {
306 self.environment.network()
307 }
308
309 #[instrument(level = "trace", skip(self))]
311 pub fn signer(&self) -> &Env::Signer {
312 self.environment.signer()
313 }
314
315 pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, chain_client::Error> {
317 self.signer()
318 .contains_key(owner)
319 .await
320 .map_err(chain_client::Error::signer_failure)
321 }
322
323 pub fn wallet(&self) -> &Env::Wallet {
325 self.environment.wallet()
326 }
327
328 #[instrument(level = "trace", skip(self))]
331 pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
332 let mut chain_modes = self
333 .chain_modes
334 .write()
335 .expect("Panics should not happen while holding a lock to `chain_modes`");
336 let entry = chain_modes.entry(chain_id).or_insert(mode.clone());
337 entry.extend(Some(mode));
338 entry.clone()
339 }
340
341 pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
343 self.chain_modes
344 .read()
345 .expect("Panics should not happen while holding a lock to `chain_modes`")
346 .get(&chain_id)
347 .cloned()
348 }
349
350 pub fn is_tracked(&self, chain_id: ChainId) -> bool {
352 self.chain_modes
353 .read()
354 .expect("Panics should not happen while holding a lock to `chain_modes`")
355 .get(&chain_id)
356 .is_some_and(ListeningMode::is_full)
357 }
358
359 #[expect(clippy::too_many_arguments)]
361 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
362 pub fn create_chain_client(
363 self: &Arc<Self>,
364 chain_id: ChainId,
365 block_hash: Option<CryptoHash>,
366 next_block_height: BlockHeight,
367 pending_proposal: Option<PendingProposal>,
368 preferred_owner: Option<AccountOwner>,
369 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
370 follow_only: bool,
371 ) -> ChainClient<Env> {
372 self.chains.pin().get_or_insert_with(chain_id, || {
375 chain_client::State::new(pending_proposal.clone(), follow_only)
376 });
377
378 ChainClient::new(
379 self.clone(),
380 chain_id,
381 self.options.clone(),
382 block_hash,
383 next_block_height,
384 preferred_owner,
385 timing_sender,
386 )
387 }
388
389 fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
391 self.chains
392 .pin()
393 .get(&chain_id)
394 .is_some_and(|state| state.is_follow_only())
395 }
396
397 pub fn set_chain_follow_only(&self, chain_id: ChainId, follow_only: bool) {
399 self.chains.pin().update(chain_id, |state| {
400 let mut state = state.clone_for_update_unchecked();
401 state.set_follow_only(follow_only);
402 state
403 });
404 }
405
406 async fn fetch_chain_info(
408 &self,
409 chain_id: ChainId,
410 validators: &[RemoteNode<Env::ValidatorNode>],
411 ) -> Result<Box<ChainInfo>, chain_client::Error> {
412 match self.local_node.chain_info(chain_id).await {
413 Ok(info) => Ok(info),
414 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
415 self.synchronize_chain_state(self.admin_chain_id).await?;
417 self.update_local_node_with_blobs_from(blob_ids, validators)
420 .await?;
421 Ok(self.local_node.chain_info(chain_id).await?)
422 }
423 Err(err) => Err(err.into()),
424 }
425 }
426
427 fn weighted_select(
428 remaining_validators: &mut Vec<RemoteNode<Env::ValidatorNode>>,
429 remaining_weights: &mut Vec<u64>,
430 ) -> Option<RemoteNode<Env::ValidatorNode>> {
431 if remaining_weights.is_empty() {
432 return None;
433 }
434 let dist = WeightedIndex::new(remaining_weights.clone()).unwrap();
435 let idx = dist.sample(&mut rand::thread_rng());
436 remaining_weights.remove(idx);
437 Some(remaining_validators.remove(idx))
438 }
439
440 #[instrument(level = "trace", skip(self))]
442 async fn download_certificates(
443 &self,
444 chain_id: ChainId,
445 target_next_block_height: BlockHeight,
446 ) -> Result<Box<ChainInfo>, chain_client::Error> {
447 let (_, committee) = self.admin_committee().await?;
448 let mut remaining_validators = self.make_nodes(&committee)?;
449 let mut info = self
450 .fetch_chain_info(chain_id, &remaining_validators)
451 .await?;
452 let mut remaining_weights = remaining_validators
454 .iter()
455 .map(|validator| {
456 let validator_state = committee.validators.get(&validator.public_key).unwrap();
457 validator_state.votes
458 })
459 .collect::<Vec<_>>();
460
461 while let Some(remote_node) =
462 Self::weighted_select(&mut remaining_validators, &mut remaining_weights)
463 {
464 if target_next_block_height <= info.next_block_height {
465 return Ok(info);
466 }
467 match self
468 .download_certificates_from(&remote_node, chain_id, target_next_block_height)
469 .await
470 {
471 Err(error) => info!(
472 remote_node = remote_node.address(),
473 %error,
474 "failed to download certificates from validator",
475 ),
476 Ok(Some(new_info)) => info = new_info,
477 Ok(None) => {}
478 }
479 }
480 ensure!(
481 target_next_block_height <= info.next_block_height,
482 chain_client::Error::CannotDownloadCertificates {
483 chain_id,
484 target_next_block_height,
485 }
486 );
487 Ok(info)
488 }
489
490 #[instrument(level = "trace", skip_all)]
493 async fn download_certificates_from(
494 &self,
495 remote_node: &RemoteNode<Env::ValidatorNode>,
496 chain_id: ChainId,
497 stop: BlockHeight,
498 ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
499 let mut last_info = None;
500 let chain_info = self.local_node.chain_info(chain_id).await?;
502 let mut next_height = chain_info.next_block_height;
503 let hashes = self
504 .local_node
505 .get_preprocessed_block_hashes(chain_id, next_height, stop)
506 .await?;
507 let certificates = self.storage_client().read_certificates(&hashes).await?;
508 let certificates = match ResultReadCertificates::new(certificates, hashes) {
509 ResultReadCertificates::Certificates(certificates) => certificates,
510 ResultReadCertificates::InvalidHashes(hashes) => {
511 return Err(chain_client::Error::ReadCertificatesError(hashes))
512 }
513 };
514 for certificate in certificates {
515 last_info = Some(self.handle_certificate(certificate).await?.info);
516 }
517 while next_height < stop {
519 let limit = u64::from(stop)
521 .checked_sub(u64::from(next_height))
522 .ok_or(ArithmeticError::Overflow)?
523 .min(self.options.certificate_download_batch_size);
524
525 let certificates = self
526 .requests_scheduler
527 .download_certificates(remote_node, chain_id, next_height, limit)
528 .await?;
529 let Some(info) = self.process_certificates(remote_node, certificates).await? else {
530 break;
531 };
532 assert!(info.next_block_height > next_height);
533 next_height = info.next_block_height;
534 last_info = Some(info);
535 }
536 Ok(last_info)
537 }
538
539 async fn download_blobs(
540 &self,
541 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
542 blob_ids: &[BlobId],
543 ) -> Result<(), chain_client::Error> {
544 let blobs = &self
545 .requests_scheduler
546 .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
547 .await?
548 .ok_or_else(|| {
549 chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
550 })?;
551 self.local_node.store_blobs(blobs).await.map_err(Into::into)
552 }
553
554 #[instrument(level = "trace", skip_all)]
557 async fn process_certificates(
558 &self,
559 remote_node: &RemoteNode<Env::ValidatorNode>,
560 certificates: Vec<ConfirmedBlockCertificate>,
561 ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
562 let mut info = None;
563 let required_blob_ids: Vec<_> = certificates
564 .iter()
565 .flat_map(|certificate| certificate.value().required_blob_ids())
566 .collect();
567
568 match self
569 .local_node
570 .read_blob_states_from_storage(&required_blob_ids)
571 .await
572 {
573 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
574 self.download_blobs(std::slice::from_ref(remote_node), &blob_ids)
575 .await?;
576 }
577 x => {
578 x?;
579 }
580 }
581
582 for certificate in certificates {
583 info = Some(
584 match self.handle_certificate(certificate.clone()).await {
585 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
586 self.download_blobs(std::slice::from_ref(remote_node), &blob_ids)
587 .await?;
588 self.handle_certificate(certificate).await?
589 }
590 x => x?,
591 }
592 .info,
593 );
594 }
595
596 Ok(info)
598 }
599
600 async fn handle_certificate<T: ProcessableCertificate>(
601 &self,
602 certificate: GenericCertificate<T>,
603 ) -> Result<ChainInfoResponse, LocalNodeError> {
604 self.local_node
605 .handle_certificate(certificate, &self.notifier)
606 .await
607 }
608
609 async fn chain_info_with_committees(
610 &self,
611 chain_id: ChainId,
612 ) -> Result<Box<ChainInfo>, LocalNodeError> {
613 let query = ChainInfoQuery::new(chain_id).with_committees();
614 let info = self.local_node.handle_chain_info_query(query).await?.info;
615 Ok(info)
616 }
617
618 #[instrument(level = "trace", skip_all)]
621 async fn admin_committees(
622 &self,
623 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
624 let info = self.chain_info_with_committees(self.admin_chain_id).await?;
625 Ok((info.epoch, info.into_committees()?))
626 }
627
628 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
630 let info = self.chain_info_with_committees(self.admin_chain_id).await?;
631 Ok((info.epoch, info.into_current_committee()?))
632 }
633
634 async fn validator_nodes(
636 &self,
637 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
638 let (_, committee) = self.admin_committee().await?;
639 Ok(self.make_nodes(&committee)?)
640 }
641
642 fn make_nodes(
644 &self,
645 committee: &Committee,
646 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
647 Ok(self
648 .validator_node_provider()
649 .make_nodes(committee)?
650 .map(|(public_key, node)| RemoteNode { public_key, node })
651 .collect())
652 }
653
654 pub async fn get_chain_description_blob(
657 &self,
658 chain_id: ChainId,
659 ) -> Result<Blob, chain_client::Error> {
660 let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
661 let blob = self
662 .local_node
663 .storage_client()
664 .read_blob(chain_desc_id)
665 .await?;
666 if let Some(blob) = blob {
667 return Ok(blob);
669 }
670 self.synchronize_chain_state(self.admin_chain_id).await?;
672 let nodes = self.validator_nodes().await?;
673 Ok(self
674 .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
675 .await?
676 .pop()
677 .unwrap()) }
679
680 pub async fn get_chain_description(
683 &self,
684 chain_id: ChainId,
685 ) -> Result<ChainDescription, chain_client::Error> {
686 let blob = self.get_chain_description_blob(chain_id).await?;
687 Ok(bcs::from_bytes(blob.bytes())?)
688 }
689
690 #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
692 fn update_from_info(&self, info: &ChainInfo) {
693 self.chains.pin().update(info.chain_id, |state| {
694 let mut state = state.clone_for_update_unchecked();
695 state.update_from_info(info);
696 state
697 });
698 }
699
700 #[instrument(level = "trace", skip_all)]
702 async fn process_certificate<T: ProcessableCertificate>(
703 &self,
704 certificate: Box<GenericCertificate<T>>,
705 ) -> Result<(), LocalNodeError> {
706 let info = self.handle_certificate(*certificate).await?.info;
707 self.update_from_info(&info);
708 Ok(())
709 }
710
711 #[instrument(level = "trace", skip_all)]
713 pub(crate) async fn finalize_block(
714 self: &Arc<Self>,
715 committee: &Committee,
716 certificate: ValidatedBlockCertificate,
717 ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
718 debug!(round = %certificate.round, "Submitting block for confirmation");
719 let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
720 let finalize_action = CommunicateAction::FinalizeBlock {
721 certificate: Box::new(certificate),
722 delivery: self.options.cross_chain_message_delivery,
723 };
724 let certificate = self
725 .communicate_chain_action(committee, finalize_action, hashed_value)
726 .await?;
727 self.receive_certificate_with_checked_signatures(certificate.clone())
728 .await?;
729 Ok(certificate)
730 }
731
732 #[instrument(level = "trace", skip_all)]
734 pub(crate) async fn submit_block_proposal<T: ProcessableCertificate>(
735 self: &Arc<Self>,
736 committee: &Committee,
737 proposal: Box<BlockProposal>,
738 value: T,
739 ) -> Result<GenericCertificate<T>, chain_client::Error> {
740 use linera_storage::Clock as _;
741
742 debug!(
743 round = %proposal.content.round,
744 "Submitting block proposal to validators"
745 );
746
747 let block_timestamp = proposal.content.block.timestamp;
749 let local_time = self.local_node.storage_client().clock().current_time();
750 if block_timestamp > local_time {
751 info!(
752 chain_id = %proposal.content.block.chain_id,
753 %block_timestamp,
754 %local_time,
755 "Block timestamp is in the future; waiting for validators",
756 );
757 }
758
759 let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
761 let submit_action = CommunicateAction::SubmitBlock {
762 proposal,
763 blob_ids: value.required_blob_ids().into_iter().collect(),
764 clock_skew_sender,
765 };
766
767 let validity_threshold = committee.validity_threshold();
769 let committee_clone = committee.clone();
770 let clock_skew_check_handle = linera_base::Task::spawn(async move {
771 let mut skew_weight = 0u64;
772 let mut min_skew = TimeDelta::MAX;
773 let mut max_skew = TimeDelta::ZERO;
774 while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
775 if clock_skew.as_micros() > 0 {
776 skew_weight += committee_clone.weight(&public_key);
777 min_skew = min_skew.min(clock_skew);
778 max_skew = max_skew.max(clock_skew);
779 if skew_weight >= validity_threshold {
780 warn!(
781 skew_weight,
782 validity_threshold,
783 min_skew_ms = min_skew.as_micros() / 1000,
784 max_skew_ms = max_skew.as_micros() / 1000,
785 "A validity threshold of validators reported clock skew; \
786 consider checking your system clock",
787 );
788 return;
789 }
790 }
791 }
792 });
793
794 let certificate = self
795 .communicate_chain_action(committee, submit_action, value)
796 .await?;
797
798 clock_skew_check_handle.await;
799
800 self.process_certificate(Box::new(certificate.clone()))
801 .await?;
802 Ok(certificate)
803 }
804
805 #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
807 async fn communicate_chain_updates(
808 self: &Arc<Self>,
809 committee: &Committee,
810 chain_id: ChainId,
811 height: BlockHeight,
812 delivery: CrossChainMessageDelivery,
813 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
814 ) -> Result<(), chain_client::Error> {
815 let nodes = self.make_nodes(committee)?;
816 communicate_with_quorum(
817 &nodes,
818 committee,
819 |_: &()| (),
820 |remote_node| {
821 let mut updater = ValidatorUpdater {
822 remote_node,
823 client: self.clone(),
824 admin_chain_id: self.admin_chain_id,
825 };
826 let certificate = latest_certificate.clone();
827 Box::pin(async move {
828 updater
829 .send_chain_information(chain_id, height, delivery, certificate)
830 .await
831 })
832 },
833 self.options.quorum_grace_period,
834 )
835 .await?;
836 Ok(())
837 }
838
839 #[instrument(level = "trace", skip_all)]
845 async fn communicate_chain_action<T: CertificateValue>(
846 self: &Arc<Self>,
847 committee: &Committee,
848 action: CommunicateAction,
849 value: T,
850 ) -> Result<GenericCertificate<T>, chain_client::Error> {
851 let nodes = self.make_nodes(committee)?;
852 let ((votes_hash, votes_round), votes) = communicate_with_quorum(
853 &nodes,
854 committee,
855 |vote: &LiteVote| (vote.value.value_hash, vote.round),
856 |remote_node| {
857 let mut updater = ValidatorUpdater {
858 remote_node,
859 client: self.clone(),
860 admin_chain_id: self.admin_chain_id,
861 };
862 let action = action.clone();
863 Box::pin(async move { updater.send_chain_update(action).await })
864 },
865 self.options.quorum_grace_period,
866 )
867 .await?;
868 ensure!(
869 (votes_hash, votes_round) == (value.hash(), action.round()),
870 chain_client::Error::UnexpectedQuorum {
871 hash: votes_hash,
872 round: votes_round,
873 expected_hash: value.hash(),
874 expected_round: action.round(),
875 }
876 );
877 let certificate = LiteCertificate::try_from_votes(votes)
882 .ok_or_else(|| {
883 chain_client::Error::InternalError(
884 "Vote values or rounds don't match; this is a bug",
885 )
886 })?
887 .with_value(value)
888 .ok_or_else(|| {
889 chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
890 })?;
891 Ok(certificate)
892 }
893
894 #[instrument(level = "trace", skip_all)]
897 async fn receive_certificate_with_checked_signatures(
898 &self,
899 certificate: ConfirmedBlockCertificate,
900 ) -> Result<(), chain_client::Error> {
901 let certificate = Box::new(certificate);
902 let block = certificate.block();
903 self.download_certificates(block.header.chain_id, block.header.height)
905 .await?;
906 if let Err(err) = self.process_certificate(certificate.clone()).await {
909 match &err {
910 LocalNodeError::BlobsNotFound(blob_ids) => {
911 self.download_blobs(&self.validator_nodes().await?, blob_ids)
912 .await
913 .map_err(|_| err)?;
914 self.process_certificate(certificate).await?;
915 }
916 _ => {
917 warn!("Failed to process network hashed certificate value");
919 return Err(err.into());
920 }
921 }
922 }
923
924 Ok(())
925 }
926
927 #[instrument(level = "trace", skip_all)]
929 #[allow(dead_code)] async fn receive_sender_certificate(
931 &self,
932 certificate: ConfirmedBlockCertificate,
933 mode: ReceiveCertificateMode,
934 nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
935 ) -> Result<(), chain_client::Error> {
936 let (max_epoch, committees) = self.admin_committees().await?;
938 if let ReceiveCertificateMode::NeedsCheck = mode {
939 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
940 }
941 let nodes = if let Some(nodes) = nodes {
943 nodes
944 } else {
945 self.validator_nodes().await?
946 };
947 if let Err(err) = self.handle_certificate(certificate.clone()).await {
948 match &err {
949 LocalNodeError::BlobsNotFound(blob_ids) => {
950 self.download_blobs(&nodes, blob_ids).await?;
951 self.handle_certificate(certificate.clone()).await?;
952 }
953 _ => {
954 warn!("Failed to process network hashed certificate value");
956 return Err(err.into());
957 }
958 }
959 }
960
961 Ok(())
962 }
963
964 #[instrument(level = "trace", skip_all)]
966 async fn download_and_process_sender_chain(
967 &self,
968 sender_chain_id: ChainId,
969 nodes: &[RemoteNode<Env::ValidatorNode>],
970 received_log: &ReceivedLogs,
971 mut remote_heights: Vec<BlockHeight>,
972 sender: mpsc::UnboundedSender<ChainAndHeight>,
973 ) {
974 let (max_epoch, committees) = match self.admin_committees().await {
975 Ok(result) => result,
976 Err(error) => {
977 error!(%error, %sender_chain_id, "could not read admin committees");
978 return;
979 }
980 };
981 let committees_ref = &committees;
982 let mut nodes = nodes.to_vec();
983 while !remote_heights.is_empty() {
984 let remote_heights_ref = &remote_heights;
985 nodes.shuffle(&mut rand::thread_rng());
986 let certificates = match communicate_concurrently(
987 &nodes,
988 async move |remote_node| {
989 let mut remote_heights = remote_heights_ref.clone();
990 remote_heights.retain(|height| {
993 received_log.validator_has_block(
994 &remote_node.public_key,
995 sender_chain_id,
996 *height,
997 )
998 });
999 if remote_heights.is_empty() {
1000 return Err(());
1003 }
1004 let certificates = self
1005 .requests_scheduler
1006 .download_certificates_by_heights(
1007 &remote_node,
1008 sender_chain_id,
1009 remote_heights,
1010 )
1011 .await
1012 .map_err(|_| ())?;
1013 let mut certificates_with_check_results = vec![];
1014 for cert in certificates {
1015 if let Ok(check_result) =
1016 Self::check_certificate(max_epoch, committees_ref, &cert)
1017 {
1018 certificates_with_check_results
1019 .push((cert, check_result.into_result().is_ok()));
1020 } else {
1021 return Err(());
1023 }
1024 }
1025 Ok(certificates_with_check_results)
1026 },
1027 |errors| {
1028 errors
1029 .into_iter()
1030 .map(|(validator, _error)| validator)
1031 .collect::<BTreeSet<_>>()
1032 },
1033 self.options.certificate_batch_download_timeout,
1034 )
1035 .await
1036 {
1037 Ok(certificates_with_check_results) => certificates_with_check_results,
1038 Err(faulty_validators) => {
1039 nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1041 if nodes.is_empty() {
1042 info!(
1043 chain_id = %sender_chain_id,
1044 "could not download certificates for chain - no more correct validators left"
1045 );
1046 return;
1047 }
1048 continue;
1049 }
1050 };
1051
1052 trace!(
1053 chain_id = %sender_chain_id,
1054 num_certificates = %certificates.len(),
1055 "received certificates",
1056 );
1057
1058 let mut to_remove_from_queue = BTreeSet::new();
1059
1060 for (certificate, check_result) in certificates {
1061 let hash = certificate.hash();
1062 let chain_id = certificate.block().header.chain_id;
1063 let height = certificate.block().header.height;
1064 if !check_result {
1065 to_remove_from_queue.insert(height);
1069 continue;
1070 }
1071 let mode = ReceiveCertificateMode::AlreadyChecked;
1073 if let Err(error) = self
1074 .receive_sender_certificate(certificate, mode, None)
1075 .await
1076 {
1077 warn!(%error, %hash, "Received invalid certificate");
1078 } else {
1079 to_remove_from_queue.insert(height);
1080 if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1081 error!(
1082 %chain_id,
1083 %height,
1084 %error,
1085 "failed to send chain and height over the channel",
1086 );
1087 }
1088 }
1089 }
1090
1091 remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1092 }
1093 trace!(
1094 chain_id = %sender_chain_id,
1095 "find_received_certificates: finished processing chain",
1096 );
1097 }
1098
1099 #[instrument(level = "trace", skip(self))]
1101 async fn get_received_log_from_validator(
1102 &self,
1103 chain_id: ChainId,
1104 remote_node: &RemoteNode<Env::ValidatorNode>,
1105 tracker: u64,
1106 ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1107 let mut offset = tracker;
1108
1109 let mut remote_log = Vec::new();
1111 loop {
1112 trace!("get_received_log_from_validator: looping");
1113 let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1114 let info = remote_node.handle_chain_info_query(query).await?;
1115 let received_entries = info.requested_received_log.len();
1116 offset += received_entries as u64;
1117 remote_log.extend(info.requested_received_log);
1118 trace!(
1119 remote_node = remote_node.address(),
1120 %received_entries,
1121 "get_received_log_from_validator: received log batch",
1122 );
1123 if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1124 break;
1125 }
1126 }
1127
1128 trace!(
1129 remote_node = remote_node.address(),
1130 num_entries = remote_log.len(),
1131 "get_received_log_from_validator: returning downloaded log",
1132 );
1133
1134 Ok(remote_log)
1135 }
1136
1137 async fn download_sender_block_with_sending_ancestors(
1143 &self,
1144 receiver_chain_id: ChainId,
1145 sender_chain_id: ChainId,
1146 height: BlockHeight,
1147 remote_node: &RemoteNode<Env::ValidatorNode>,
1148 ) -> Result<(), chain_client::Error> {
1149 let next_outbox_height = self
1150 .local_node
1151 .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1152 .await?
1153 .get(&sender_chain_id)
1154 .copied()
1155 .unwrap_or(BlockHeight::ZERO);
1156 let (max_epoch, committees) = self.admin_committees().await?;
1157
1158 let mut certificates = BTreeMap::new();
1161 let mut current_height = height;
1162
1163 while current_height >= next_outbox_height {
1165 let downloaded = self
1167 .requests_scheduler
1168 .download_certificates_by_heights(
1169 remote_node,
1170 sender_chain_id,
1171 vec![current_height],
1172 )
1173 .await?;
1174 let Some(certificate) = downloaded.into_iter().next() else {
1175 return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1176 chain_id: sender_chain_id,
1177 height: current_height,
1178 });
1179 };
1180
1181 Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1183 .into_result()?;
1184
1185 let block = certificate.block();
1187 let next_height = block
1188 .body
1189 .previous_message_blocks
1190 .get(&receiver_chain_id)
1191 .map(|(_prev_hash, prev_height)| *prev_height);
1192
1193 certificates.insert(current_height, certificate);
1195
1196 if let Some(prev_height) = next_height {
1197 current_height = prev_height;
1199 } else {
1200 break;
1202 }
1203 }
1204
1205 if certificates.is_empty() {
1206 self.local_node
1207 .retry_pending_cross_chain_requests(sender_chain_id)
1208 .await?;
1209 }
1210
1211 for certificate in certificates.into_values() {
1213 self.receive_sender_certificate(
1214 certificate,
1215 ReceiveCertificateMode::AlreadyChecked,
1216 Some(vec![remote_node.clone()]),
1217 )
1218 .await?;
1219 }
1220
1221 Ok(())
1222 }
1223
1224 #[instrument(
1225 level = "trace", skip_all,
1226 fields(certificate_hash = ?incoming_certificate.hash()),
1227 )]
1228 fn check_certificate(
1229 highest_known_epoch: Epoch,
1230 committees: &BTreeMap<Epoch, Committee>,
1231 incoming_certificate: &ConfirmedBlockCertificate,
1232 ) -> Result<CheckCertificateResult, NodeError> {
1233 let block = incoming_certificate.block();
1234 if block.header.epoch > highest_known_epoch {
1236 return Ok(CheckCertificateResult::FutureEpoch);
1237 }
1238 if let Some(known_committee) = committees.get(&block.header.epoch) {
1239 incoming_certificate.check(known_committee)?;
1242 Ok(CheckCertificateResult::New)
1243 } else {
1244 Ok(CheckCertificateResult::OldEpoch)
1246 }
1247 }
1248
1249 #[instrument(level = "trace", skip_all)]
1253 async fn synchronize_chain_state(
1254 &self,
1255 chain_id: ChainId,
1256 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1257 let (_, committee) = self.admin_committee().await?;
1258 self.synchronize_chain_from_committee(chain_id, committee)
1259 .await
1260 }
1261
1262 #[instrument(level = "trace", skip_all)]
1267 pub(crate) async fn synchronize_chain_from_committee(
1268 &self,
1269 chain_id: ChainId,
1270 committee: Committee,
1271 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1272 #[cfg(with_metrics)]
1273 let _latency = if !self.is_chain_follow_only(chain_id) {
1274 Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1275 } else {
1276 None
1277 };
1278
1279 let validators = self.make_nodes(&committee)?;
1280 Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1281 communicate_with_quorum(
1282 &validators,
1283 &committee,
1284 |_: &()| (),
1285 |remote_node| async move {
1286 self.synchronize_chain_state_from(&remote_node, chain_id)
1287 .await
1288 },
1289 self.options.quorum_grace_period,
1290 )
1291 .await?;
1292
1293 self.local_node
1294 .chain_info(chain_id)
1295 .await
1296 .map_err(Into::into)
1297 }
1298
1299 #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1305 pub(crate) async fn synchronize_chain_state_from(
1306 &self,
1307 remote_node: &RemoteNode<Env::ValidatorNode>,
1308 chain_id: ChainId,
1309 ) -> Result<(), chain_client::Error> {
1310 let with_manager_values = !self.is_chain_follow_only(chain_id);
1311 let query = if with_manager_values {
1312 ChainInfoQuery::new(chain_id).with_manager_values()
1313 } else {
1314 ChainInfoQuery::new(chain_id)
1315 };
1316 let remote_info = remote_node.handle_chain_info_query(query).await?;
1317 let local_info = self
1318 .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1319 .await?;
1320
1321 if !with_manager_values {
1322 return Ok(());
1323 }
1324
1325 let local_height = match local_info {
1327 Some(info) => info.next_block_height,
1328 None => {
1329 self.local_node
1330 .chain_info(chain_id)
1331 .await?
1332 .next_block_height
1333 }
1334 };
1335 if local_height != remote_info.next_block_height {
1336 debug!(
1337 remote_node = remote_node.address(),
1338 remote_height = %remote_info.next_block_height,
1339 local_height = %local_height,
1340 "synced from validator, but remote height and local height are different",
1341 );
1342 return Ok(());
1343 };
1344
1345 if let Some(timeout) = remote_info.manager.timeout {
1346 self.handle_certificate(*timeout).await?;
1347 }
1348 let mut proposals = Vec::new();
1349 if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1350 proposals.push(*proposal);
1351 }
1352 if let Some(proposal) = remote_info.manager.requested_proposed {
1353 proposals.push(*proposal);
1354 }
1355 if let Some(locking) = remote_info.manager.requested_locking {
1356 match *locking {
1357 LockingBlock::Fast(proposal) => {
1358 proposals.push(proposal);
1359 }
1360 LockingBlock::Regular(cert) => {
1361 let hash = cert.hash();
1362 if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1363 {
1364 debug!(
1365 remote_node = remote_node.address(),
1366 %hash,
1367 height = %local_height,
1368 %error,
1369 "skipping locked block from validator",
1370 );
1371 }
1372 }
1373 }
1374 }
1375 'proposal_loop: for proposal in proposals {
1376 let owner: AccountOwner = proposal.owner();
1377 if let Err(mut err) = self
1378 .local_node
1379 .handle_block_proposal(proposal.clone())
1380 .await
1381 {
1382 if let LocalNodeError::BlobsNotFound(_) = &err {
1383 let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1384 if !required_blob_ids.is_empty() {
1385 let mut blobs = Vec::new();
1386 for blob_id in required_blob_ids {
1387 let blob_content = match self
1388 .requests_scheduler
1389 .download_pending_blob(remote_node, chain_id, blob_id)
1390 .await
1391 {
1392 Ok(content) => content,
1393 Err(error) => {
1394 info!(
1395 remote_node = remote_node.address(),
1396 height = %local_height,
1397 proposer = %owner,
1398 %blob_id,
1399 %error,
1400 "skipping proposal from validator; failed to download blob",
1401 );
1402 continue 'proposal_loop;
1403 }
1404 };
1405 blobs.push(Blob::new(blob_content));
1406 }
1407 self.local_node
1408 .handle_pending_blobs(chain_id, blobs)
1409 .await?;
1410 if let Err(new_err) = self
1412 .local_node
1413 .handle_block_proposal(proposal.clone())
1414 .await
1415 {
1416 err = new_err;
1417 } else {
1418 continue;
1419 }
1420 }
1421 if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1422 self.update_local_node_with_blobs_from(
1423 blob_ids.clone(),
1424 std::slice::from_ref(remote_node),
1425 )
1426 .await?;
1427 if let Err(new_err) = self
1429 .local_node
1430 .handle_block_proposal(proposal.clone())
1431 .await
1432 {
1433 err = new_err;
1434 } else {
1435 continue;
1436 }
1437 }
1438 }
1439 while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1440 if let ChainError::MissingCrossChainUpdate {
1441 chain_id,
1442 origin,
1443 height,
1444 } = &**chain_err
1445 {
1446 self.download_sender_block_with_sending_ancestors(
1447 *chain_id,
1448 *origin,
1449 *height,
1450 remote_node,
1451 )
1452 .await?;
1453 if let Err(new_err) = self
1455 .local_node
1456 .handle_block_proposal(proposal.clone())
1457 .await
1458 {
1459 err = new_err;
1460 } else {
1461 continue 'proposal_loop;
1462 }
1463 } else {
1464 break;
1465 }
1466 }
1467
1468 debug!(
1469 remote_node = remote_node.address(),
1470 proposer = %owner,
1471 height = %local_height,
1472 error = %err,
1473 "skipping proposal from validator",
1474 );
1475 }
1476 }
1477 Ok(())
1478 }
1479
1480 async fn try_process_locking_block_from(
1481 &self,
1482 remote_node: &RemoteNode<Env::ValidatorNode>,
1483 certificate: GenericCertificate<ValidatedBlock>,
1484 ) -> Result<(), chain_client::Error> {
1485 let chain_id = certificate.inner().chain_id();
1486 let certificate = Box::new(certificate);
1487 match self.process_certificate(certificate.clone()).await {
1488 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1489 let mut blobs = Vec::new();
1490 for blob_id in blob_ids {
1491 let blob_content = self
1492 .requests_scheduler
1493 .download_pending_blob(remote_node, chain_id, blob_id)
1494 .await?;
1495 blobs.push(Blob::new(blob_content));
1496 }
1497 self.local_node
1498 .handle_pending_blobs(chain_id, blobs)
1499 .await?;
1500 self.process_certificate(certificate).await?;
1501 Ok(())
1502 }
1503 Err(err) => Err(err.into()),
1504 Ok(()) => Ok(()),
1505 }
1506 }
1507
1508 async fn update_local_node_with_blobs_from(
1511 &self,
1512 blob_ids: Vec<BlobId>,
1513 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1514 ) -> Result<Vec<Blob>, chain_client::Error> {
1515 let timeout = self.options.blob_download_timeout;
1516 let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1518 stream::iter(blob_ids.into_iter().map(|blob_id| {
1519 communicate_concurrently(
1520 remote_nodes,
1521 async move |remote_node| {
1522 let certificate = self
1523 .requests_scheduler
1524 .download_certificate_for_blob(&remote_node, blob_id)
1525 .await?;
1526 self.receive_sender_certificate(
1527 certificate,
1528 ReceiveCertificateMode::NeedsCheck,
1529 Some(vec![remote_node.clone()]),
1530 )
1531 .await?;
1532 let blob = self
1533 .local_node
1534 .storage_client()
1535 .read_blob(blob_id)
1536 .await?
1537 .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1538 Result::<_, chain_client::Error>::Ok(blob)
1539 },
1540 move |_| chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id])),
1541 timeout,
1542 )
1543 }))
1544 .buffer_unordered(self.options.max_joined_tasks)
1545 .collect::<Vec<_>>()
1546 .await
1547 .into_iter()
1548 .collect()
1549 }
1550
1551 #[instrument(level = "trace", skip(self, block))]
1561 async fn stage_block_execution_with_policy(
1562 &self,
1563 block: ProposedBlock,
1564 round: Option<u32>,
1565 published_blobs: Vec<Blob>,
1566 policy: BundleExecutionPolicy,
1567 ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1568 loop {
1569 let result = self
1570 .local_node
1571 .stage_block_execution_with_policy(
1572 block.clone(),
1573 round,
1574 published_blobs.clone(),
1575 policy,
1576 )
1577 .await;
1578 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1579 let validators = self.validator_nodes().await?;
1580 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1581 .await?;
1582 continue; }
1584 if let Ok((_, executed_block, _, _)) = &result {
1585 let hash = CryptoHash::new(executed_block);
1586 let notification = Notification {
1587 chain_id: executed_block.header.chain_id,
1588 reason: Reason::BlockExecuted {
1589 height: executed_block.header.height,
1590 hash,
1591 },
1592 };
1593 self.notifier.notify(&[notification]);
1594 }
1595 let (_modified_block, executed_block, response, _resource_tracker) = result?;
1596 return Ok((executed_block, response));
1597 }
1598 }
1599
1600 #[instrument(level = "trace", skip(self, block))]
1603 async fn stage_block_execution(
1604 &self,
1605 block: ProposedBlock,
1606 round: Option<u32>,
1607 published_blobs: Vec<Blob>,
1608 ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1609 loop {
1610 let result = self
1611 .local_node
1612 .stage_block_execution(block.clone(), round, published_blobs.clone())
1613 .await;
1614 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1615 let validators = self.validator_nodes().await?;
1616 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1617 .await?;
1618 continue; }
1620 if let Ok((block, _, _)) = &result {
1621 let hash = CryptoHash::new(block);
1622 let notification = Notification {
1623 chain_id: block.header.chain_id,
1624 reason: Reason::BlockExecuted {
1625 height: block.header.height,
1626 hash,
1627 },
1628 };
1629 self.notifier.notify(&[notification]);
1630 }
1631 let (block, response, _resource_tracker) = result?;
1632 return Ok((block, response));
1633 }
1634 }
1635}
1636
1637async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
1640 nodes: &[RemoteNode<A>],
1641 f: F,
1642 err: G,
1643 timeout: Duration,
1644) -> Result<V, E2>
1645where
1646 F: Clone + FnOnce(RemoteNode<A>) -> R,
1647 RemoteNode<A>: Clone,
1648 G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
1649 R: Future<Output = Result<V, E1>> + 'a,
1650{
1651 let mut stream = nodes
1652 .iter()
1653 .zip(0..)
1654 .map(|(remote_node, i)| {
1655 let fun = f.clone();
1656 let node = remote_node.clone();
1657 async move {
1658 linera_base::time::timer::sleep(timeout * i * i).await;
1659 fun(node).await.map_err(|err| (remote_node.public_key, err))
1660 }
1661 })
1662 .collect::<FuturesUnordered<_>>();
1663 let mut errors = vec![];
1664 while let Some(maybe_result) = stream.next().await {
1665 match maybe_result {
1666 Ok(result) => return Ok(result),
1667 Err(error) => errors.push(error),
1668 };
1669 }
1670 Err(err(errors))
1671}
1672
1673#[must_use]
1675pub struct AbortOnDrop(pub AbortHandle);
1676
1677impl Drop for AbortOnDrop {
1678 #[instrument(level = "trace", skip(self))]
1679 fn drop(&mut self) {
1680 self.0.abort();
1681 }
1682}
1683
1684#[derive(Clone, Serialize, Deserialize)]
1686pub struct PendingProposal {
1687 pub block: ProposedBlock,
1688 pub blobs: Vec<Blob>,
1689}
1690
1691enum ReceiveCertificateMode {
1692 NeedsCheck,
1693 AlreadyChecked,
1694}
1695
1696enum CheckCertificateResult {
1697 OldEpoch,
1698 New,
1699 FutureEpoch,
1700}
1701
1702impl CheckCertificateResult {
1703 fn into_result(self) -> Result<(), chain_client::Error> {
1704 match self {
1705 Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
1706 Self::New => Ok(()),
1707 Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
1708 }
1709 }
1710}
1711
1712#[cfg(not(target_arch = "wasm32"))]
1714pub async fn create_bytecode_blobs(
1715 contract: Bytecode,
1716 service: Bytecode,
1717 vm_runtime: VmRuntime,
1718) -> (Vec<Blob>, ModuleId) {
1719 match vm_runtime {
1720 VmRuntime::Wasm => {
1721 let (compressed_contract, compressed_service) =
1722 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
1723 .await
1724 .expect("Compression should not panic");
1725 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1726 let service_blob = Blob::new_service_bytecode(compressed_service);
1727 let module_id =
1728 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1729 (vec![contract_blob, service_blob], module_id)
1730 }
1731 VmRuntime::Evm => {
1732 let compressed_contract = contract.compress();
1733 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1734 let module_id = ModuleId::new(
1735 evm_contract_blob.id().hash,
1736 evm_contract_blob.id().hash,
1737 vm_runtime,
1738 );
1739 (vec![evm_contract_blob], module_id)
1740 }
1741 }
1742}