1use std::{
6 cmp::Ordering,
7 collections::{BTreeMap, BTreeSet, HashSet},
8 sync::{Arc, RwLock},
9};
10
11use custom_debug_derive::Debug;
12use futures::{
13 future::Future,
14 stream::{self, AbortHandle, FuturesUnordered, StreamExt},
15};
16#[cfg(with_metrics)]
17use linera_base::prometheus_util::MeasureLatency as _;
18use linera_base::{
19 crypto::{CryptoHash, ValidatorPublicKey},
20 data_types::{ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch, TimeDelta},
21 ensure,
22 identifiers::{AccountOwner, BlobId, BlobType, ChainId, GenericApplicationId, StreamId},
23 time::Duration,
24};
25#[cfg(not(target_arch = "wasm32"))]
26use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
27use linera_chain::{
28 data_types::{
29 BlockProposal, ChainAndHeight, IncomingBundle, LiteVote, MessageAction, ProposedBlock,
30 Transaction,
31 },
32 manager::LockingBlock,
33 types::{
34 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
35 LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
36 },
37 ChainError, ChainExecutionContext,
38};
39use linera_execution::committee::Committee;
40use linera_storage::{ResultReadCertificates, Storage as _};
41use rand::{
42 distributions::{Distribution, WeightedIndex},
43 seq::SliceRandom,
44};
45use received_log::ReceivedLogs;
46use serde::{Deserialize, Serialize};
47use tokio::sync::mpsc;
48use tracing::{debug, error, info, instrument, trace, warn};
49
50use crate::{
51 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, RoundTimeout},
52 environment::Environment,
53 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
54 node::{CrossChainMessageDelivery, NodeError, ValidatorNodeProvider as _},
55 notifier::{ChannelNotifier, Notifier as _},
56 remote_node::RemoteNode,
57 updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
58 worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
59 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
60};
61
62pub mod chain_client;
63pub use chain_client::ChainClient;
64
65pub use crate::data_types::ClientOutcome;
66
67#[cfg(test)]
68#[path = "../unit_tests/client_tests.rs"]
69mod client_tests;
70pub mod requests_scheduler;
71
72pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
73mod received_log;
74mod validator_trackers;
75
76#[cfg(with_metrics)]
77mod metrics {
78 use std::sync::LazyLock;
79
80 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
81 use prometheus::HistogramVec;
82
83 pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
84 LazyLock::new(|| {
85 register_histogram_vec(
86 "process_inbox_latency",
87 "process_inbox latency",
88 &[],
89 exponential_bucket_latencies(500.0),
90 )
91 });
92
93 pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94 register_histogram_vec(
95 "prepare_chain_latency",
96 "prepare_chain latency",
97 &[],
98 exponential_bucket_latencies(500.0),
99 )
100 });
101
102 pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
103 register_histogram_vec(
104 "synchronize_chain_state_latency",
105 "synchronize_chain_state latency",
106 &[],
107 exponential_bucket_latencies(500.0),
108 )
109 });
110
111 pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
112 register_histogram_vec(
113 "execute_block_latency",
114 "execute_block latency",
115 &[],
116 exponential_bucket_latencies(500.0),
117 )
118 });
119
120 pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
121 register_histogram_vec(
122 "find_received_certificates_latency",
123 "find_received_certificates latency",
124 &[],
125 exponential_bucket_latencies(500.0),
126 )
127 });
128}
129
130pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
131pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
132
133#[derive(Clone, Debug)]
135pub struct MessagePolicy {
136 blanket: BlanketMessagePolicy,
138 restrict_chain_ids_to: Option<HashSet<ChainId>>,
142 reject_message_bundles_without_application_ids: Option<HashSet<GenericApplicationId>>,
145 reject_message_bundles_with_other_application_ids: Option<HashSet<GenericApplicationId>>,
148}
149
150#[derive(Default, Copy, Clone, Debug, clap::ValueEnum, serde::Deserialize, tsify_next::Tsify)]
151pub enum BlanketMessagePolicy {
152 #[default]
154 Accept,
155 Reject,
158 Ignore,
161}
162
163impl MessagePolicy {
164 pub fn new(
165 blanket: BlanketMessagePolicy,
166 restrict_chain_ids_to: Option<HashSet<ChainId>>,
167 reject_message_bundles_without_application_ids: Option<HashSet<GenericApplicationId>>,
168 reject_message_bundles_with_other_application_ids: Option<HashSet<GenericApplicationId>>,
169 ) -> Self {
170 Self {
171 blanket,
172 restrict_chain_ids_to,
173 reject_message_bundles_without_application_ids,
174 reject_message_bundles_with_other_application_ids,
175 }
176 }
177
178 #[cfg(with_testing)]
179 pub fn new_accept_all() -> Self {
180 Self {
181 blanket: BlanketMessagePolicy::Accept,
182 restrict_chain_ids_to: None,
183 reject_message_bundles_without_application_ids: None,
184 reject_message_bundles_with_other_application_ids: None,
185 }
186 }
187
188 #[instrument(level = "trace", skip(self))]
189 fn apply(&self, mut bundle: IncomingBundle) -> Option<IncomingBundle> {
190 if let Some(chain_ids) = &self.restrict_chain_ids_to {
191 if !chain_ids.contains(&bundle.origin) {
192 return None;
193 }
194 }
195 if let Some(app_ids) = &self.reject_message_bundles_without_application_ids {
196 if !bundle
197 .messages()
198 .any(|posted_msg| app_ids.contains(&posted_msg.message.application_id()))
199 {
200 return None;
201 }
202 }
203 if let Some(app_ids) = &self.reject_message_bundles_with_other_application_ids {
204 if !bundle
205 .messages()
206 .all(|posted_msg| app_ids.contains(&posted_msg.message.application_id()))
207 {
208 return None;
209 }
210 }
211 if self.is_reject() {
212 if bundle.bundle.is_skippable() {
213 return None;
214 } else if !bundle.bundle.is_protected() {
215 bundle.action = MessageAction::Reject;
216 }
217 }
218 Some(bundle)
219 }
220
221 #[instrument(level = "trace", skip(self))]
222 fn is_ignore(&self) -> bool {
223 matches!(self.blanket, BlanketMessagePolicy::Ignore)
224 }
225
226 #[instrument(level = "trace", skip(self))]
227 fn is_reject(&self) -> bool {
228 matches!(self.blanket, BlanketMessagePolicy::Reject)
229 }
230}
231
232#[derive(Debug, Clone, Copy)]
233pub enum TimingType {
234 ExecuteOperations,
235 ExecuteBlock,
236 SubmitBlockProposal,
237 UpdateValidators,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
244pub enum ListeningMode {
245 FullChain,
248 FollowChain,
252 EventsOnly(BTreeSet<StreamId>),
254}
255
256impl PartialOrd for ListeningMode {
257 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
258 match (self, other) {
259 (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
260 (ListeningMode::FullChain, _) => Some(Ordering::Greater),
261 (_, ListeningMode::FullChain) => Some(Ordering::Less),
262 (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
263 (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
264 (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
265 (ListeningMode::EventsOnly(events_a), ListeningMode::EventsOnly(events_b)) => {
266 if events_a.is_superset(events_b) {
267 Some(Ordering::Greater)
268 } else if events_b.is_superset(events_a) {
269 Some(Ordering::Less)
270 } else {
271 None
272 }
273 }
274 }
275 }
276}
277
278impl ListeningMode {
279 pub fn is_relevant(&self, reason: &Reason) -> bool {
282 match (reason, self) {
283 (_, ListeningMode::FullChain) => true,
285 (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
288 (Reason::NewEvents { .. }, ListeningMode::FollowChain) => true,
289 (_, ListeningMode::FollowChain) => false,
290 (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
292 relevant.intersection(event_streams).next().is_some()
293 }
294 (_, ListeningMode::EventsOnly(_)) => false,
295 }
296 }
297
298 pub fn extend(&mut self, other: Option<ListeningMode>) {
299 match (self, other) {
300 (_, None) => (),
301 (ListeningMode::FullChain, _) => (),
302 (mode, Some(ListeningMode::FullChain)) => {
303 *mode = ListeningMode::FullChain;
304 }
305 (ListeningMode::FollowChain, _) => (),
306 (mode, Some(ListeningMode::FollowChain)) => {
307 *mode = ListeningMode::FollowChain;
308 }
309 (
310 ListeningMode::EventsOnly(self_events),
311 Some(ListeningMode::EventsOnly(other_events)),
312 ) => {
313 self_events.extend(other_events);
314 }
315 }
316 }
317}
318
319pub struct Client<Env: Environment> {
321 environment: Env,
322 pub local_node: LocalNodeClient<Env::Storage>,
325 requests_scheduler: RequestsScheduler<Env>,
327 admin_id: ChainId,
329 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
332 notifier: Arc<ChannelNotifier<Notification>>,
334 chains: papaya::HashMap<ChainId, chain_client::State>,
336 options: chain_client::Options,
338}
339
340impl<Env: Environment> Client<Env> {
341 #[instrument(level = "trace", skip_all)]
343 #[allow(clippy::too_many_arguments)]
344 pub fn new(
345 environment: Env,
346 admin_id: ChainId,
347 long_lived_services: bool,
348 tracked_chains: impl IntoIterator<Item = ChainId>,
349 name: impl Into<String>,
350 chain_worker_ttl: Duration,
351 sender_chain_worker_ttl: Duration,
352 options: chain_client::Options,
353 block_cache_size: usize,
354 execution_state_cache_size: usize,
355 requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
356 ) -> Self {
357 let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
358 let state = WorkerState::new_for_client(
359 name.into(),
360 environment.storage().clone(),
361 tracked_chains.clone(),
362 block_cache_size,
363 execution_state_cache_size,
364 )
365 .with_long_lived_services(long_lived_services)
366 .with_allow_inactive_chains(true)
367 .with_allow_messages_from_deprecated_epochs(true)
368 .with_chain_worker_ttl(chain_worker_ttl)
369 .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
370 let local_node = LocalNodeClient::new(state);
371 let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
372
373 Self {
374 environment,
375 local_node,
376 requests_scheduler,
377 chains: papaya::HashMap::new(),
378 admin_id,
379 tracked_chains,
380 notifier: Arc::new(ChannelNotifier::default()),
381 options,
382 }
383 }
384
385 pub fn admin_chain(&self) -> ChainId {
387 self.admin_id
388 }
389
390 pub fn storage_client(&self) -> &Env::Storage {
392 self.environment.storage()
393 }
394
395 pub fn validator_node_provider(&self) -> &Env::Network {
396 self.environment.network()
397 }
398
399 #[instrument(level = "trace", skip(self))]
401 pub fn signer(&self) -> &Env::Signer {
402 self.environment.signer()
403 }
404
405 pub fn wallet(&self) -> &Env::Wallet {
407 self.environment.wallet()
408 }
409
410 #[instrument(level = "trace", skip(self))]
412 pub fn track_chain(&self, chain_id: ChainId) {
413 self.tracked_chains
414 .write()
415 .expect("Panics should not happen while holding a lock to `tracked_chains`")
416 .insert(chain_id);
417 }
418
419 #[expect(clippy::too_many_arguments)]
421 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
422 pub fn create_chain_client(
423 self: &Arc<Self>,
424 chain_id: ChainId,
425 block_hash: Option<CryptoHash>,
426 next_block_height: BlockHeight,
427 pending_proposal: Option<PendingProposal>,
428 preferred_owner: Option<AccountOwner>,
429 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
430 follow_only: bool,
431 ) -> ChainClient<Env> {
432 self.chains.pin().get_or_insert_with(chain_id, || {
435 chain_client::State::new(pending_proposal.clone(), follow_only)
436 });
437
438 ChainClient::new(
439 self.clone(),
440 chain_id,
441 self.options.clone(),
442 block_hash,
443 next_block_height,
444 preferred_owner,
445 timing_sender,
446 )
447 }
448
449 fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
451 self.chains
452 .pin()
453 .get(&chain_id)
454 .is_some_and(|state| state.is_follow_only())
455 }
456
457 pub fn set_chain_follow_only(&self, chain_id: ChainId, follow_only: bool) {
459 self.chains.pin().update(chain_id, |state| {
460 let mut state = state.clone_for_update_unchecked();
461 state.set_follow_only(follow_only);
462 state
463 });
464 }
465
466 async fn fetch_chain_info(
468 &self,
469 chain_id: ChainId,
470 validators: &[RemoteNode<Env::ValidatorNode>],
471 ) -> Result<Box<ChainInfo>, chain_client::Error> {
472 match self.local_node.chain_info(chain_id).await {
473 Ok(info) => Ok(info),
474 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
475 self.synchronize_chain_state(self.admin_id).await?;
477 self.update_local_node_with_blobs_from(blob_ids, validators)
480 .await?;
481 Ok(self.local_node.chain_info(chain_id).await?)
482 }
483 Err(err) => Err(err.into()),
484 }
485 }
486
487 fn weighted_select(
488 remaining_validators: &mut Vec<RemoteNode<Env::ValidatorNode>>,
489 remaining_weights: &mut Vec<u64>,
490 ) -> Option<RemoteNode<Env::ValidatorNode>> {
491 if remaining_weights.is_empty() {
492 return None;
493 }
494 let dist = WeightedIndex::new(remaining_weights.clone()).unwrap();
495 let idx = dist.sample(&mut rand::thread_rng());
496 remaining_weights.remove(idx);
497 Some(remaining_validators.remove(idx))
498 }
499
500 #[instrument(level = "trace", skip(self))]
502 async fn download_certificates(
503 &self,
504 chain_id: ChainId,
505 target_next_block_height: BlockHeight,
506 ) -> Result<Box<ChainInfo>, chain_client::Error> {
507 let (_, committee) = self.admin_committee().await?;
508 let mut remaining_validators = self.make_nodes(&committee)?;
509 let mut info = self
510 .fetch_chain_info(chain_id, &remaining_validators)
511 .await?;
512 let mut remaining_weights = remaining_validators
514 .iter()
515 .map(|validator| {
516 let validator_state = committee.validators.get(&validator.public_key).unwrap();
517 validator_state.votes
518 })
519 .collect::<Vec<_>>();
520
521 while let Some(remote_node) =
522 Self::weighted_select(&mut remaining_validators, &mut remaining_weights)
523 {
524 if target_next_block_height <= info.next_block_height {
525 return Ok(info);
526 }
527 match self
528 .download_certificates_from(&remote_node, chain_id, target_next_block_height)
529 .await
530 {
531 Err(error) => info!(
532 remote_node = remote_node.address(),
533 %error,
534 "failed to download certificates from validator",
535 ),
536 Ok(Some(new_info)) => info = new_info,
537 Ok(None) => {}
538 }
539 }
540 ensure!(
541 target_next_block_height <= info.next_block_height,
542 chain_client::Error::CannotDownloadCertificates {
543 chain_id,
544 target_next_block_height,
545 }
546 );
547 Ok(info)
548 }
549
550 #[instrument(level = "trace", skip_all)]
553 async fn download_certificates_from(
554 &self,
555 remote_node: &RemoteNode<Env::ValidatorNode>,
556 chain_id: ChainId,
557 stop: BlockHeight,
558 ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
559 let mut last_info = None;
560 let chain_info = self.local_node.chain_info(chain_id).await?;
562 let mut next_height = chain_info.next_block_height;
563 let hashes = self
564 .local_node
565 .get_preprocessed_block_hashes(chain_id, next_height, stop)
566 .await?;
567 let certificates = self
568 .storage_client()
569 .read_certificates(hashes.clone())
570 .await?;
571 let certificates = match ResultReadCertificates::new(certificates, hashes) {
572 ResultReadCertificates::Certificates(certificates) => certificates,
573 ResultReadCertificates::InvalidHashes(hashes) => {
574 return Err(chain_client::Error::ReadCertificatesError(hashes))
575 }
576 };
577 for certificate in certificates {
578 last_info = Some(self.handle_certificate(certificate).await?.info);
579 }
580 while next_height < stop {
582 let limit = u64::from(stop)
584 .checked_sub(u64::from(next_height))
585 .ok_or(ArithmeticError::Overflow)?
586 .min(self.options.certificate_download_batch_size);
587
588 let certificates = self
589 .requests_scheduler
590 .download_certificates(remote_node, chain_id, next_height, limit)
591 .await?;
592 let Some(info) = self.process_certificates(remote_node, certificates).await? else {
593 break;
594 };
595 assert!(info.next_block_height > next_height);
596 next_height = info.next_block_height;
597 last_info = Some(info);
598 }
599 Ok(last_info)
600 }
601
602 async fn download_blobs(
603 &self,
604 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
605 blob_ids: &[BlobId],
606 ) -> Result<(), chain_client::Error> {
607 let blobs = &self
608 .requests_scheduler
609 .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
610 .await?
611 .ok_or_else(|| {
612 chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
613 })?;
614 self.local_node.store_blobs(blobs).await.map_err(Into::into)
615 }
616
617 #[instrument(level = "trace", skip_all)]
620 async fn process_certificates(
621 &self,
622 remote_node: &RemoteNode<Env::ValidatorNode>,
623 certificates: Vec<ConfirmedBlockCertificate>,
624 ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
625 let mut info = None;
626 let required_blob_ids: Vec<_> = certificates
627 .iter()
628 .flat_map(|certificate| certificate.value().required_blob_ids())
629 .collect();
630
631 match self
632 .local_node
633 .read_blob_states_from_storage(&required_blob_ids)
634 .await
635 {
636 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
637 self.download_blobs(&[remote_node.clone()], &blob_ids)
638 .await?;
639 }
640 x => {
641 x?;
642 }
643 }
644
645 for certificate in certificates {
646 info = Some(
647 match self.handle_certificate(certificate.clone()).await {
648 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
649 self.download_blobs(&[remote_node.clone()], &blob_ids)
650 .await?;
651 self.handle_certificate(certificate).await?
652 }
653 x => x?,
654 }
655 .info,
656 );
657 }
658
659 Ok(info)
661 }
662
663 async fn handle_certificate<T: ProcessableCertificate>(
664 &self,
665 certificate: GenericCertificate<T>,
666 ) -> Result<ChainInfoResponse, LocalNodeError> {
667 self.local_node
668 .handle_certificate(certificate, &self.notifier)
669 .await
670 }
671
672 async fn chain_info_with_committees(
673 &self,
674 chain_id: ChainId,
675 ) -> Result<Box<ChainInfo>, LocalNodeError> {
676 let query = ChainInfoQuery::new(chain_id).with_committees();
677 let info = self.local_node.handle_chain_info_query(query).await?.info;
678 Ok(info)
679 }
680
681 #[instrument(level = "trace", skip_all)]
684 async fn admin_committees(
685 &self,
686 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
687 let info = self.chain_info_with_committees(self.admin_id).await?;
688 Ok((info.epoch, info.into_committees()?))
689 }
690
691 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
693 let info = self.chain_info_with_committees(self.admin_id).await?;
694 Ok((info.epoch, info.into_current_committee()?))
695 }
696
697 async fn validator_nodes(
699 &self,
700 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
701 let (_, committee) = self.admin_committee().await?;
702 Ok(self.make_nodes(&committee)?)
703 }
704
705 fn make_nodes(
707 &self,
708 committee: &Committee,
709 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
710 Ok(self
711 .validator_node_provider()
712 .make_nodes(committee)?
713 .map(|(public_key, node)| RemoteNode { public_key, node })
714 .collect())
715 }
716
717 pub async fn get_chain_description(
720 &self,
721 chain_id: ChainId,
722 ) -> Result<ChainDescription, chain_client::Error> {
723 let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
724 let blob = self
725 .local_node
726 .storage_client()
727 .read_blob(chain_desc_id)
728 .await?;
729 if let Some(blob) = blob {
730 return Ok(bcs::from_bytes(blob.bytes())?);
732 };
733 self.synchronize_chain_state(self.admin_id).await?;
735 let nodes = self.validator_nodes().await?;
736 let blob = self
737 .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
738 .await?
739 .pop()
740 .unwrap(); Ok(bcs::from_bytes(blob.bytes())?)
742 }
743
744 #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
746 fn update_from_info(&self, info: &ChainInfo) {
747 self.chains.pin().update(info.chain_id, |state| {
748 let mut state = state.clone_for_update_unchecked();
749 state.update_from_info(info);
750 state
751 });
752 }
753
754 #[instrument(level = "trace", skip_all)]
756 async fn process_certificate<T: ProcessableCertificate>(
757 &self,
758 certificate: Box<GenericCertificate<T>>,
759 ) -> Result<(), LocalNodeError> {
760 let info = self.handle_certificate(*certificate).await?.info;
761 self.update_from_info(&info);
762 Ok(())
763 }
764
765 #[instrument(level = "trace", skip_all)]
767 pub(crate) async fn finalize_block(
768 self: &Arc<Self>,
769 committee: &Committee,
770 certificate: ValidatedBlockCertificate,
771 ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
772 debug!(round = %certificate.round, "Submitting block for confirmation");
773 let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
774 let finalize_action = CommunicateAction::FinalizeBlock {
775 certificate: Box::new(certificate),
776 delivery: self.options.cross_chain_message_delivery,
777 };
778 let certificate = self
779 .communicate_chain_action(committee, finalize_action, hashed_value)
780 .await?;
781 self.receive_certificate_with_checked_signatures(certificate.clone())
782 .await?;
783 Ok(certificate)
784 }
785
786 #[instrument(level = "trace", skip_all)]
788 pub(crate) async fn submit_block_proposal<T: ProcessableCertificate>(
789 self: &Arc<Self>,
790 committee: &Committee,
791 proposal: Box<BlockProposal>,
792 value: T,
793 ) -> Result<GenericCertificate<T>, chain_client::Error> {
794 use linera_storage::Clock as _;
795
796 debug!(
797 round = %proposal.content.round,
798 "Submitting block proposal to validators"
799 );
800
801 let block_timestamp = proposal.content.block.timestamp;
803 let local_time = self.local_node.storage_client().clock().current_time();
804 if block_timestamp > local_time {
805 info!(
806 chain_id = %proposal.content.block.chain_id,
807 %block_timestamp,
808 %local_time,
809 "Block timestamp is in the future; waiting for validators",
810 );
811 }
812
813 let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
815 let submit_action = CommunicateAction::SubmitBlock {
816 proposal,
817 blob_ids: value.required_blob_ids().into_iter().collect(),
818 clock_skew_sender,
819 };
820
821 let validity_threshold = committee.validity_threshold();
823 let committee_clone = committee.clone();
824 let clock_skew_check_handle = linera_base::task::spawn(async move {
825 let mut skew_weight = 0u64;
826 let mut min_skew = TimeDelta::MAX;
827 let mut max_skew = TimeDelta::ZERO;
828 while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
829 if clock_skew.as_micros() > 0 {
830 skew_weight += committee_clone.weight(&public_key);
831 min_skew = min_skew.min(clock_skew);
832 max_skew = max_skew.max(clock_skew);
833 if skew_weight >= validity_threshold {
834 warn!(
835 skew_weight,
836 validity_threshold,
837 min_skew_ms = min_skew.as_micros() / 1000,
838 max_skew_ms = max_skew.as_micros() / 1000,
839 "A validity threshold of validators reported clock skew; \
840 consider checking your system clock",
841 );
842 return;
843 }
844 }
845 }
846 });
847
848 let certificate = self
849 .communicate_chain_action(committee, submit_action, value)
850 .await?;
851
852 clock_skew_check_handle.await;
853
854 self.process_certificate(Box::new(certificate.clone()))
855 .await?;
856 Ok(certificate)
857 }
858
859 #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
861 async fn communicate_chain_updates(
862 self: &Arc<Self>,
863 committee: &Committee,
864 chain_id: ChainId,
865 height: BlockHeight,
866 delivery: CrossChainMessageDelivery,
867 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
868 ) -> Result<(), chain_client::Error> {
869 let nodes = self.make_nodes(committee)?;
870 communicate_with_quorum(
871 &nodes,
872 committee,
873 |_: &()| (),
874 |remote_node| {
875 let mut updater = ValidatorUpdater {
876 remote_node,
877 client: self.clone(),
878 admin_id: self.admin_id,
879 };
880 let certificate = latest_certificate.clone();
881 Box::pin(async move {
882 updater
883 .send_chain_information(chain_id, height, delivery, certificate)
884 .await
885 })
886 },
887 self.options.quorum_grace_period,
888 )
889 .await?;
890 Ok(())
891 }
892
893 #[instrument(level = "trace", skip_all)]
899 async fn communicate_chain_action<T: CertificateValue>(
900 self: &Arc<Self>,
901 committee: &Committee,
902 action: CommunicateAction,
903 value: T,
904 ) -> Result<GenericCertificate<T>, chain_client::Error> {
905 let nodes = self.make_nodes(committee)?;
906 let ((votes_hash, votes_round), votes) = communicate_with_quorum(
907 &nodes,
908 committee,
909 |vote: &LiteVote| (vote.value.value_hash, vote.round),
910 |remote_node| {
911 let mut updater = ValidatorUpdater {
912 remote_node,
913 client: self.clone(),
914 admin_id: self.admin_id,
915 };
916 let action = action.clone();
917 Box::pin(async move { updater.send_chain_update(action).await })
918 },
919 self.options.quorum_grace_period,
920 )
921 .await?;
922 ensure!(
923 (votes_hash, votes_round) == (value.hash(), action.round()),
924 chain_client::Error::UnexpectedQuorum {
925 hash: votes_hash,
926 round: votes_round,
927 expected_hash: value.hash(),
928 expected_round: action.round(),
929 }
930 );
931 let certificate = LiteCertificate::try_from_votes(votes)
936 .ok_or_else(|| {
937 chain_client::Error::InternalError(
938 "Vote values or rounds don't match; this is a bug",
939 )
940 })?
941 .with_value(value)
942 .ok_or_else(|| {
943 chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
944 })?;
945 Ok(certificate)
946 }
947
948 #[instrument(level = "trace", skip_all)]
951 async fn receive_certificate_with_checked_signatures(
952 &self,
953 certificate: ConfirmedBlockCertificate,
954 ) -> Result<(), chain_client::Error> {
955 let certificate = Box::new(certificate);
956 let block = certificate.block();
957 self.download_certificates(block.header.chain_id, block.header.height)
959 .await?;
960 if let Err(err) = self.process_certificate(certificate.clone()).await {
963 match &err {
964 LocalNodeError::BlobsNotFound(blob_ids) => {
965 self.download_blobs(&self.validator_nodes().await?, blob_ids)
966 .await
967 .map_err(|_| err)?;
968 self.process_certificate(certificate).await?;
969 }
970 _ => {
971 warn!("Failed to process network hashed certificate value");
973 return Err(err.into());
974 }
975 }
976 }
977
978 Ok(())
979 }
980
981 #[instrument(level = "trace", skip_all)]
983 #[allow(dead_code)] async fn receive_sender_certificate(
985 &self,
986 certificate: ConfirmedBlockCertificate,
987 mode: ReceiveCertificateMode,
988 nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
989 ) -> Result<(), chain_client::Error> {
990 let (max_epoch, committees) = self.admin_committees().await?;
992 if let ReceiveCertificateMode::NeedsCheck = mode {
993 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
994 }
995 let nodes = if let Some(nodes) = nodes {
997 nodes
998 } else {
999 self.validator_nodes().await?
1000 };
1001 if let Err(err) = self.handle_certificate(certificate.clone()).await {
1002 match &err {
1003 LocalNodeError::BlobsNotFound(blob_ids) => {
1004 self.download_blobs(&nodes, blob_ids).await?;
1005 self.handle_certificate(certificate.clone()).await?;
1006 }
1007 _ => {
1008 warn!("Failed to process network hashed certificate value");
1010 return Err(err.into());
1011 }
1012 }
1013 }
1014
1015 Ok(())
1016 }
1017
1018 #[instrument(level = "trace", skip_all)]
1020 async fn download_and_process_sender_chain(
1021 &self,
1022 sender_chain_id: ChainId,
1023 nodes: &[RemoteNode<Env::ValidatorNode>],
1024 received_log: &ReceivedLogs,
1025 mut remote_heights: Vec<BlockHeight>,
1026 sender: mpsc::UnboundedSender<ChainAndHeight>,
1027 ) {
1028 let (max_epoch, committees) = match self.admin_committees().await {
1029 Ok(result) => result,
1030 Err(error) => {
1031 error!(%error, %sender_chain_id, "could not read admin committees");
1032 return;
1033 }
1034 };
1035 let committees_ref = &committees;
1036 let mut nodes = nodes.to_vec();
1037 while !remote_heights.is_empty() {
1038 let remote_heights_ref = &remote_heights;
1039 nodes.shuffle(&mut rand::thread_rng());
1040 let certificates = match communicate_concurrently(
1041 &nodes,
1042 async move |remote_node| {
1043 let mut remote_heights = remote_heights_ref.clone();
1044 remote_heights.retain(|height| {
1047 received_log.validator_has_block(
1048 &remote_node.public_key,
1049 sender_chain_id,
1050 *height,
1051 )
1052 });
1053 if remote_heights.is_empty() {
1054 return Err(());
1057 }
1058 let certificates = self
1059 .requests_scheduler
1060 .download_certificates_by_heights(
1061 &remote_node,
1062 sender_chain_id,
1063 remote_heights,
1064 )
1065 .await
1066 .map_err(|_| ())?;
1067 let mut certificates_with_check_results = vec![];
1068 for cert in certificates {
1069 if let Ok(check_result) =
1070 Self::check_certificate(max_epoch, committees_ref, &cert)
1071 {
1072 certificates_with_check_results
1073 .push((cert, check_result.into_result().is_ok()));
1074 } else {
1075 return Err(());
1077 }
1078 }
1079 Ok(certificates_with_check_results)
1080 },
1081 |errors| {
1082 errors
1083 .into_iter()
1084 .map(|(validator, _error)| validator)
1085 .collect::<BTreeSet<_>>()
1086 },
1087 self.options.certificate_batch_download_timeout,
1088 )
1089 .await
1090 {
1091 Ok(certificates_with_check_results) => certificates_with_check_results,
1092 Err(faulty_validators) => {
1093 nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1095 if nodes.is_empty() {
1096 info!(
1097 chain_id = %sender_chain_id,
1098 "could not download certificates for chain - no more correct validators left"
1099 );
1100 return;
1101 }
1102 continue;
1103 }
1104 };
1105
1106 trace!(
1107 chain_id = %sender_chain_id,
1108 num_certificates = %certificates.len(),
1109 "received certificates",
1110 );
1111
1112 let mut to_remove_from_queue = BTreeSet::new();
1113
1114 for (certificate, check_result) in certificates {
1115 let hash = certificate.hash();
1116 let chain_id = certificate.block().header.chain_id;
1117 let height = certificate.block().header.height;
1118 if !check_result {
1119 to_remove_from_queue.insert(height);
1123 continue;
1124 }
1125 let mode = ReceiveCertificateMode::AlreadyChecked;
1127 if let Err(error) = self
1128 .receive_sender_certificate(certificate, mode, None)
1129 .await
1130 {
1131 warn!(%error, %hash, "Received invalid certificate");
1132 } else {
1133 to_remove_from_queue.insert(height);
1134 if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1135 error!(
1136 %chain_id,
1137 %height,
1138 %error,
1139 "failed to send chain and height over the channel",
1140 );
1141 }
1142 }
1143 }
1144
1145 remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1146 }
1147 trace!(
1148 chain_id = %sender_chain_id,
1149 "find_received_certificates: finished processing chain",
1150 );
1151 }
1152
1153 #[instrument(level = "trace", skip(self))]
1155 async fn get_received_log_from_validator(
1156 &self,
1157 chain_id: ChainId,
1158 remote_node: &RemoteNode<Env::ValidatorNode>,
1159 tracker: u64,
1160 ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1161 let mut offset = tracker;
1162
1163 let mut remote_log = Vec::new();
1165 loop {
1166 trace!("get_received_log_from_validator: looping");
1167 let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1168 let info = remote_node.handle_chain_info_query(query).await?;
1169 let received_entries = info.requested_received_log.len();
1170 offset += received_entries as u64;
1171 remote_log.extend(info.requested_received_log);
1172 trace!(
1173 remote_node = remote_node.address(),
1174 %received_entries,
1175 "get_received_log_from_validator: received log batch",
1176 );
1177 if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1178 break;
1179 }
1180 }
1181
1182 trace!(
1183 remote_node = remote_node.address(),
1184 num_entries = remote_log.len(),
1185 "get_received_log_from_validator: returning downloaded log",
1186 );
1187
1188 Ok(remote_log)
1189 }
1190
1191 async fn download_sender_block_with_sending_ancestors(
1197 &self,
1198 receiver_chain_id: ChainId,
1199 sender_chain_id: ChainId,
1200 height: BlockHeight,
1201 remote_node: &RemoteNode<Env::ValidatorNode>,
1202 ) -> Result<(), chain_client::Error> {
1203 let next_outbox_height = self
1204 .local_node
1205 .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1206 .await?
1207 .get(&sender_chain_id)
1208 .copied()
1209 .unwrap_or(BlockHeight::ZERO);
1210 let (max_epoch, committees) = self.admin_committees().await?;
1211
1212 let mut certificates = BTreeMap::new();
1215 let mut current_height = height;
1216
1217 while current_height >= next_outbox_height {
1219 let downloaded = self
1221 .requests_scheduler
1222 .download_certificates_by_heights(
1223 remote_node,
1224 sender_chain_id,
1225 vec![current_height],
1226 )
1227 .await?;
1228 let Some(certificate) = downloaded.into_iter().next() else {
1229 return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1230 chain_id: sender_chain_id,
1231 height: current_height,
1232 });
1233 };
1234
1235 Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1237 .into_result()?;
1238
1239 let block = certificate.block();
1241 let next_height = block
1242 .body
1243 .previous_message_blocks
1244 .get(&receiver_chain_id)
1245 .map(|(_prev_hash, prev_height)| *prev_height);
1246
1247 certificates.insert(current_height, certificate);
1249
1250 if let Some(prev_height) = next_height {
1251 current_height = prev_height;
1253 } else {
1254 break;
1256 }
1257 }
1258
1259 if certificates.is_empty() {
1260 self.local_node
1261 .retry_pending_cross_chain_requests(sender_chain_id)
1262 .await?;
1263 }
1264
1265 for certificate in certificates.into_values() {
1267 self.receive_sender_certificate(
1268 certificate,
1269 ReceiveCertificateMode::AlreadyChecked,
1270 Some(vec![remote_node.clone()]),
1271 )
1272 .await?;
1273 }
1274
1275 Ok(())
1276 }
1277
1278 #[instrument(
1279 level = "trace", skip_all,
1280 fields(certificate_hash = ?incoming_certificate.hash()),
1281 )]
1282 fn check_certificate(
1283 highest_known_epoch: Epoch,
1284 committees: &BTreeMap<Epoch, Committee>,
1285 incoming_certificate: &ConfirmedBlockCertificate,
1286 ) -> Result<CheckCertificateResult, NodeError> {
1287 let block = incoming_certificate.block();
1288 if block.header.epoch > highest_known_epoch {
1290 return Ok(CheckCertificateResult::FutureEpoch);
1291 }
1292 if let Some(known_committee) = committees.get(&block.header.epoch) {
1293 incoming_certificate.check(known_committee)?;
1296 Ok(CheckCertificateResult::New)
1297 } else {
1298 Ok(CheckCertificateResult::OldEpoch)
1300 }
1301 }
1302
1303 #[instrument(level = "trace", skip_all)]
1307 async fn synchronize_chain_state(
1308 &self,
1309 chain_id: ChainId,
1310 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1311 let (_, committee) = self.admin_committee().await?;
1312 self.synchronize_chain_from_committee(chain_id, committee)
1313 .await
1314 }
1315
1316 #[instrument(level = "trace", skip_all)]
1321 pub(crate) async fn synchronize_chain_from_committee(
1322 &self,
1323 chain_id: ChainId,
1324 committee: Committee,
1325 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1326 #[cfg(with_metrics)]
1327 let _latency = if !self.is_chain_follow_only(chain_id) {
1328 Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1329 } else {
1330 None
1331 };
1332
1333 let validators = self.make_nodes(&committee)?;
1334 Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1335 communicate_with_quorum(
1336 &validators,
1337 &committee,
1338 |_: &()| (),
1339 |remote_node| async move {
1340 self.synchronize_chain_state_from(&remote_node, chain_id)
1341 .await
1342 },
1343 self.options.quorum_grace_period,
1344 )
1345 .await?;
1346
1347 self.local_node
1348 .chain_info(chain_id)
1349 .await
1350 .map_err(Into::into)
1351 }
1352
1353 #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1359 pub(crate) async fn synchronize_chain_state_from(
1360 &self,
1361 remote_node: &RemoteNode<Env::ValidatorNode>,
1362 chain_id: ChainId,
1363 ) -> Result<(), chain_client::Error> {
1364 let with_manager_values = !self.is_chain_follow_only(chain_id);
1365 let query = if with_manager_values {
1366 ChainInfoQuery::new(chain_id).with_manager_values()
1367 } else {
1368 ChainInfoQuery::new(chain_id)
1369 };
1370 let remote_info = remote_node.handle_chain_info_query(query).await?;
1371 let local_info = self
1372 .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1373 .await?;
1374
1375 if !with_manager_values {
1376 return Ok(());
1377 }
1378
1379 let local_height = match local_info {
1381 Some(info) => info.next_block_height,
1382 None => {
1383 self.local_node
1384 .chain_info(chain_id)
1385 .await?
1386 .next_block_height
1387 }
1388 };
1389 if local_height != remote_info.next_block_height {
1390 debug!(
1391 remote_node = remote_node.address(),
1392 remote_height = %remote_info.next_block_height,
1393 local_height = %local_height,
1394 "synced from validator, but remote height and local height are different",
1395 );
1396 return Ok(());
1397 };
1398
1399 if let Some(timeout) = remote_info.manager.timeout {
1400 self.handle_certificate(*timeout).await?;
1401 }
1402 let mut proposals = Vec::new();
1403 if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1404 proposals.push(*proposal);
1405 }
1406 if let Some(proposal) = remote_info.manager.requested_proposed {
1407 proposals.push(*proposal);
1408 }
1409 if let Some(locking) = remote_info.manager.requested_locking {
1410 match *locking {
1411 LockingBlock::Fast(proposal) => {
1412 proposals.push(proposal);
1413 }
1414 LockingBlock::Regular(cert) => {
1415 let hash = cert.hash();
1416 if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1417 {
1418 debug!(
1419 remote_node = remote_node.address(),
1420 %hash,
1421 height = %local_height,
1422 %error,
1423 "skipping locked block from validator",
1424 );
1425 }
1426 }
1427 }
1428 }
1429 'proposal_loop: for proposal in proposals {
1430 let owner: AccountOwner = proposal.owner();
1431 if let Err(mut err) = self
1432 .local_node
1433 .handle_block_proposal(proposal.clone())
1434 .await
1435 {
1436 if let LocalNodeError::BlobsNotFound(_) = &err {
1437 let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1438 if !required_blob_ids.is_empty() {
1439 let mut blobs = Vec::new();
1440 for blob_id in required_blob_ids {
1441 let blob_content = match self
1442 .requests_scheduler
1443 .download_pending_blob(remote_node, chain_id, blob_id)
1444 .await
1445 {
1446 Ok(content) => content,
1447 Err(error) => {
1448 info!(
1449 remote_node = remote_node.address(),
1450 height = %local_height,
1451 proposer = %owner,
1452 %blob_id,
1453 %error,
1454 "skipping proposal from validator; failed to download blob",
1455 );
1456 continue 'proposal_loop;
1457 }
1458 };
1459 blobs.push(Blob::new(blob_content));
1460 }
1461 self.local_node
1462 .handle_pending_blobs(chain_id, blobs)
1463 .await?;
1464 if let Err(new_err) = self
1466 .local_node
1467 .handle_block_proposal(proposal.clone())
1468 .await
1469 {
1470 err = new_err;
1471 } else {
1472 continue;
1473 }
1474 }
1475 if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1476 self.update_local_node_with_blobs_from(
1477 blob_ids.clone(),
1478 &[remote_node.clone()],
1479 )
1480 .await?;
1481 if let Err(new_err) = self
1483 .local_node
1484 .handle_block_proposal(proposal.clone())
1485 .await
1486 {
1487 err = new_err;
1488 } else {
1489 continue;
1490 }
1491 }
1492 }
1493 while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1494 if let ChainError::MissingCrossChainUpdate {
1495 chain_id,
1496 origin,
1497 height,
1498 } = &**chain_err
1499 {
1500 self.download_sender_block_with_sending_ancestors(
1501 *chain_id,
1502 *origin,
1503 *height,
1504 remote_node,
1505 )
1506 .await?;
1507 if let Err(new_err) = self
1509 .local_node
1510 .handle_block_proposal(proposal.clone())
1511 .await
1512 {
1513 err = new_err;
1514 } else {
1515 continue 'proposal_loop;
1516 }
1517 } else {
1518 break;
1519 }
1520 }
1521
1522 debug!(
1523 remote_node = remote_node.address(),
1524 proposer = %owner,
1525 height = %local_height,
1526 error = %err,
1527 "skipping proposal from validator",
1528 );
1529 }
1530 }
1531 Ok(())
1532 }
1533
1534 async fn try_process_locking_block_from(
1535 &self,
1536 remote_node: &RemoteNode<Env::ValidatorNode>,
1537 certificate: GenericCertificate<ValidatedBlock>,
1538 ) -> Result<(), chain_client::Error> {
1539 let chain_id = certificate.inner().chain_id();
1540 let certificate = Box::new(certificate);
1541 match self.process_certificate(certificate.clone()).await {
1542 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1543 let mut blobs = Vec::new();
1544 for blob_id in blob_ids {
1545 let blob_content = self
1546 .requests_scheduler
1547 .download_pending_blob(remote_node, chain_id, blob_id)
1548 .await?;
1549 blobs.push(Blob::new(blob_content));
1550 }
1551 self.local_node
1552 .handle_pending_blobs(chain_id, blobs)
1553 .await?;
1554 self.process_certificate(certificate).await?;
1555 Ok(())
1556 }
1557 Err(err) => Err(err.into()),
1558 Ok(()) => Ok(()),
1559 }
1560 }
1561
1562 async fn update_local_node_with_blobs_from(
1565 &self,
1566 blob_ids: Vec<BlobId>,
1567 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1568 ) -> Result<Vec<Blob>, chain_client::Error> {
1569 let timeout = self.options.blob_download_timeout;
1570 let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1572 stream::iter(blob_ids.into_iter().map(|blob_id| {
1573 communicate_concurrently(
1574 remote_nodes,
1575 async move |remote_node| {
1576 let certificate = self
1577 .requests_scheduler
1578 .download_certificate_for_blob(&remote_node, blob_id)
1579 .await?;
1580 self.receive_sender_certificate(
1581 certificate,
1582 ReceiveCertificateMode::NeedsCheck,
1583 Some(vec![remote_node.clone()]),
1584 )
1585 .await?;
1586 let blob = self
1587 .local_node
1588 .storage_client()
1589 .read_blob(blob_id)
1590 .await?
1591 .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1592 Result::<_, chain_client::Error>::Ok(blob)
1593 },
1594 move |_| chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id])),
1595 timeout,
1596 )
1597 }))
1598 .buffer_unordered(self.options.max_joined_tasks)
1599 .collect::<Vec<_>>()
1600 .await
1601 .into_iter()
1602 .collect()
1603 }
1604
1605 #[tracing::instrument(level = "trace", skip(self, block))]
1610 async fn stage_block_execution_and_discard_failing_messages(
1611 &self,
1612 mut block: ProposedBlock,
1613 round: Option<u32>,
1614 published_blobs: Vec<Blob>,
1615 ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1616 loop {
1617 let result = self
1618 .stage_block_execution(block.clone(), round, published_blobs.clone())
1619 .await;
1620 if let Err(chain_client::Error::LocalNodeError(LocalNodeError::WorkerError(
1621 WorkerError::ChainError(chain_error),
1622 ))) = &result
1623 {
1624 if let ChainError::ExecutionError(
1625 error,
1626 ChainExecutionContext::IncomingBundle(index),
1627 ) = &**chain_error
1628 {
1629 let transaction = block
1630 .transactions
1631 .get_mut(*index as usize)
1632 .expect("Transaction at given index should exist");
1633 let Transaction::ReceiveMessages(incoming_bundle) = transaction else {
1634 panic!(
1635 "Expected incoming bundle at transaction index {}, found operation",
1636 index
1637 );
1638 };
1639 ensure!(
1640 !incoming_bundle.bundle.is_protected(),
1641 chain_client::Error::BlockProposalError(
1642 "Protected incoming message failed to execute locally"
1643 )
1644 );
1645 if incoming_bundle.action == MessageAction::Reject {
1646 return result;
1647 }
1648 info!(
1652 %error, %index, origin = ?incoming_bundle.origin,
1653 "Message bundle failed to execute locally and will be rejected."
1654 );
1655 incoming_bundle.action = MessageAction::Reject;
1656 continue;
1657 }
1658 }
1659 return result;
1660 }
1661 }
1662
1663 #[instrument(level = "trace", skip(self, block))]
1666 async fn stage_block_execution(
1667 &self,
1668 block: ProposedBlock,
1669 round: Option<u32>,
1670 published_blobs: Vec<Blob>,
1671 ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1672 loop {
1673 let result = self
1674 .local_node
1675 .stage_block_execution(block.clone(), round, published_blobs.clone())
1676 .await;
1677 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1678 let validators = self.validator_nodes().await?;
1679 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1680 .await?;
1681 continue; }
1683 if let Ok((block, _)) = &result {
1684 let hash = CryptoHash::new(block);
1685 let notification = Notification {
1686 chain_id: block.header.chain_id,
1687 reason: Reason::BlockExecuted {
1688 height: block.header.height,
1689 hash,
1690 },
1691 };
1692 self.notifier.notify(&[notification]);
1693 }
1694 return Ok(result?);
1695 }
1696 }
1697}
1698
1699async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
1702 nodes: &[RemoteNode<A>],
1703 f: F,
1704 err: G,
1705 timeout: Duration,
1706) -> Result<V, E2>
1707where
1708 F: Clone + FnOnce(RemoteNode<A>) -> R,
1709 RemoteNode<A>: Clone,
1710 G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
1711 R: Future<Output = Result<V, E1>> + 'a,
1712{
1713 let mut stream = nodes
1714 .iter()
1715 .zip(0..)
1716 .map(|(remote_node, i)| {
1717 let fun = f.clone();
1718 let node = remote_node.clone();
1719 async move {
1720 linera_base::time::timer::sleep(timeout * i * i).await;
1721 fun(node).await.map_err(|err| (remote_node.public_key, err))
1722 }
1723 })
1724 .collect::<FuturesUnordered<_>>();
1725 let mut errors = vec![];
1726 while let Some(maybe_result) = stream.next().await {
1727 match maybe_result {
1728 Ok(result) => return Ok(result),
1729 Err(error) => errors.push(error),
1730 };
1731 }
1732 Err(err(errors))
1733}
1734
1735#[derive(Debug)]
1737enum ExecuteBlockOutcome {
1738 Executed(ConfirmedBlockCertificate),
1740 Conflict(ConfirmedBlockCertificate),
1743 WaitForTimeout(RoundTimeout),
1746}
1747
1748#[must_use]
1750pub struct AbortOnDrop(pub AbortHandle);
1751
1752impl Drop for AbortOnDrop {
1753 #[instrument(level = "trace", skip(self))]
1754 fn drop(&mut self) {
1755 self.0.abort();
1756 }
1757}
1758
1759#[derive(Clone, Serialize, Deserialize)]
1761pub struct PendingProposal {
1762 pub block: ProposedBlock,
1763 pub blobs: Vec<Blob>,
1764}
1765
1766enum ReceiveCertificateMode {
1767 NeedsCheck,
1768 AlreadyChecked,
1769}
1770
1771enum CheckCertificateResult {
1772 OldEpoch,
1773 New,
1774 FutureEpoch,
1775}
1776
1777impl CheckCertificateResult {
1778 fn into_result(self) -> Result<(), chain_client::Error> {
1779 match self {
1780 Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
1781 Self::New => Ok(()),
1782 Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
1783 }
1784 }
1785}
1786
1787#[cfg(not(target_arch = "wasm32"))]
1789pub async fn create_bytecode_blobs(
1790 contract: Bytecode,
1791 service: Bytecode,
1792 vm_runtime: VmRuntime,
1793) -> (Vec<Blob>, ModuleId) {
1794 match vm_runtime {
1795 VmRuntime::Wasm => {
1796 let (compressed_contract, compressed_service) =
1797 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
1798 .await
1799 .expect("Compression should not panic");
1800 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1801 let service_blob = Blob::new_service_bytecode(compressed_service);
1802 let module_id =
1803 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1804 (vec![contract_blob, service_blob], module_id)
1805 }
1806 VmRuntime::Evm => {
1807 let compressed_contract = contract.compress();
1808 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1809 let module_id = ModuleId::new(
1810 evm_contract_blob.id().hash,
1811 evm_contract_blob.id().hash,
1812 vm_runtime,
1813 );
1814 (vec![evm_contract_blob], module_id)
1815 }
1816 }
1817}