1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt,
8 hash::Hash,
9 mem,
10 sync::Arc,
11};
12
13use futures::{future, Future, StreamExt};
14use linera_base::{
15 crypto::ValidatorPublicKey,
16 data_types::{BlockHeight, Round, TimeDelta},
17 ensure,
18 identifiers::{BlobId, BlobType, ChainId, StreamId},
19 time::{timer::timeout, Duration, Instant},
20};
21use linera_chain::{
22 data_types::{BlockProposal, LiteVote},
23 manager::LockingBlock,
24 types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
25};
26use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME, BlobOrigin};
27use linera_storage::{Arc as CacheArc, Clock, Storage};
28use thiserror::Error;
29use tokio::sync::mpsc;
30use tracing::{instrument, Level};
31
32use crate::{
33 client::{chain_client, Client},
34 data_types::{ChainInfo, ChainInfoQuery},
35 environment::Environment,
36 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
37 remote_node::RemoteNode,
38 LocalNodeError,
39};
40
41pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
44
45pub type ClockSkewReport = (ValidatorPublicKey, TimeDelta);
47const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
52pub enum CommunicateAction {
53 SubmitBlock {
54 proposal: Box<BlockProposal>,
55 blob_ids: Vec<BlobId>,
56 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
58 },
59 FinalizeBlock {
60 certificate: Box<ValidatedBlockCertificate>,
61 delivery: CrossChainMessageDelivery,
62 },
63 RequestTimeout {
64 chain_id: ChainId,
65 height: BlockHeight,
66 round: Round,
67 },
68}
69
70impl CommunicateAction {
71 pub fn round(&self) -> Round {
73 match self {
74 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
75 CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
76 CommunicateAction::RequestTimeout { round, .. } => *round,
77 }
78 }
79}
80
81pub struct ValidatorUpdater<Env>
82where
83 Env: Environment,
84{
85 pub remote_node: RemoteNode<Env::ValidatorNode>,
86 pub client: Arc<Client<Env>>,
87 pub admin_chain_id: ChainId,
88}
89
90impl<Env: Environment> Clone for ValidatorUpdater<Env> {
91 fn clone(&self) -> Self {
92 ValidatorUpdater {
93 remote_node: self.remote_node.clone(),
94 client: self.client.clone(),
95 admin_chain_id: self.admin_chain_id,
96 }
97 }
98}
99
100#[derive(Error, Debug)]
102pub enum CommunicationError<E: fmt::Debug> {
103 #[error(
106 "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
107 )]
108 NoConsensus(u64, Vec<(u64, usize)>),
109 #[error("Failed to communicate with a quorum of validators: {0}")]
112 Trusted(E),
113 #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
116 Sample(Vec<(E, u64)>),
117}
118
119pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
126 validator_clients: &'a [RemoteNode<A>],
127 committee: &Committee,
128 group_by: G,
129 execute: F,
130 quorum_grace_period: f64,
132) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
133where
134 A: ValidatorNode + Clone + 'static,
135 F: Clone + Fn(RemoteNode<A>) -> R,
136 R: Future<Output = Result<V, chain_client::Error>> + 'a,
137 G: Fn(&V) -> K,
138 K: Hash + PartialEq + Eq + Clone + 'static,
139 V: 'static,
140{
141 let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
142 .iter()
143 .filter_map(|remote_node| {
144 if committee.weight(&remote_node.public_key) == 0 {
145 return None;
148 }
149 let execute = execute.clone();
150 let remote_node = remote_node.clone();
151 Some(async move { (remote_node.public_key, execute(remote_node).await) })
152 })
153 .collect();
154
155 let start_time = Instant::now();
156 let mut end_time: Option<Instant> = None;
157 let mut remaining_votes = committee.total_votes();
158 let mut highest_key_score = 0;
159 let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
160 let mut error_scores = HashMap::new();
161
162 'vote_wait: while let Ok(Some((name, result))) = timeout(
163 end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
164 responses.next(),
165 )
166 .await
167 {
168 remaining_votes -= committee.weight(&name);
169 match result {
170 Ok(value) => {
171 let key = group_by(&value);
172 let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
173 entry.0 += committee.weight(&name);
174 entry.1.push((name, value));
175 highest_key_score = highest_key_score.max(entry.0);
176 }
177 Err(err) => {
178 let err = match err {
180 chain_client::Error::RemoteNodeError(err) => err,
181 err => NodeError::ResponseHandlingError {
182 error: err.to_string(),
183 },
184 };
185 let entry = error_scores.entry(err.clone()).or_insert(0);
186 *entry += committee.weight(&name);
187 }
188 }
189 if highest_key_score + remaining_votes < committee.quorum_threshold() {
191 break 'vote_wait;
192 }
193
194 if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
197 end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
198 }
199 }
200
201 let scores = value_scores
202 .values()
203 .map(|(weight, values)| (*weight, values.len()))
204 .collect();
205 if let Some((key, (_, values))) = value_scores
207 .into_iter()
208 .find(|(_, (score, _))| *score >= committee.quorum_threshold())
209 {
210 return Ok((key, values));
211 }
212
213 let mut sample = error_scores.into_iter().collect::<Vec<_>>();
214 sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
215 sample.truncate(4);
216 Err(match sample.as_slice() {
217 [] => CommunicationError::NoConsensus(committee.quorum_threshold(), scores),
218 [(_, score), ..] if *score >= committee.validity_threshold() => {
219 CommunicationError::Trusted(sample.into_iter().next().unwrap().0)
221 }
222 _ => CommunicationError::Sample(sample),
224 })
225}
226
227impl<Env> ValidatorUpdater<Env>
228where
229 Env: Environment + 'static,
230{
231 fn warn_if_unexpected(&self, err: &NodeError) {
233 if !err.is_expected() {
234 tracing::warn!(
235 remote_node = self.remote_node.address(),
236 %err,
237 "unexpected error from validator",
238 );
239 }
240 }
241
242 #[instrument(
243 level = "trace", skip_all, err(level = Level::DEBUG),
244 fields(chain_id = %certificate.block().header.chain_id)
245 )]
246 async fn send_confirmed_certificate(
247 &mut self,
248 certificate: &CacheArc<GenericCertificate<ConfirmedBlock>>,
249 delivery: CrossChainMessageDelivery,
250 ) -> Result<Box<ChainInfo>, chain_client::Error> {
251 let mut result = self
252 .remote_node
253 .handle_optimized_confirmed_certificate(certificate, delivery)
254 .await;
255
256 let mut sent_admin_chain = false;
257 let mut sent_blobs = false;
258 let mut sent_blocks = false;
259 loop {
260 match result {
261 Err(NodeError::EventsNotFound(event_ids))
262 if !sent_admin_chain
263 && certificate.inner().chain_id() != self.admin_chain_id
264 && event_ids.iter().all(|event_id| {
265 event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
266 && event_id.chain_id == self.admin_chain_id
267 }) =>
268 {
269 self.update_admin_chain().await?;
271 sent_admin_chain = true;
272 }
273 Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
274 self.remote_node
276 .check_blobs_not_found(certificate, &blob_ids)?;
277 let maybe_blobs = self
279 .client
280 .local_node
281 .read_blobs_from_storage(&blob_ids)
282 .await?;
283 let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
284 self.remote_node
285 .node
286 .upload_blobs(blobs.into_iter().map(|b| b.into_std()).collect())
287 .await?;
288 sent_blobs = true;
289 }
290 Err(NodeError::BlocksNotFound(hashes)) if !sent_blocks => {
291 let storage = self.client.local_node.storage_client();
297 let certificates = storage.read_certificates(&hashes).await?;
298 for (hash, maybe_cert) in hashes.iter().zip(certificates) {
299 let cert = maybe_cert.ok_or_else(|| {
300 chain_client::Error::ReadCertificatesError(vec![*hash])
301 })?;
302 self.remote_node
303 .handle_confirmed_certificate(cert, delivery)
304 .await?;
305 }
306 sent_blocks = true;
307 }
308 result => {
309 if let Err(err) = &result {
310 self.warn_if_unexpected(err);
311 }
312 return Ok(result?);
313 }
314 }
315 result = self
316 .remote_node
317 .handle_confirmed_certificate(certificate.clone(), delivery)
318 .await;
319 }
320 }
321
322 async fn send_validated_certificate(
323 &mut self,
324 certificate: GenericCertificate<ValidatedBlock>,
325 delivery: CrossChainMessageDelivery,
326 ) -> Result<Box<ChainInfo>, chain_client::Error> {
327 let result = self
328 .remote_node
329 .handle_optimized_validated_certificate(&certificate, delivery)
330 .await;
331
332 let chain_id = certificate.inner().chain_id();
333 match &result {
334 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
335 self.remote_node
336 .check_blobs_not_found(&certificate, blob_ids)?;
337 let blobs = self
340 .client
341 .local_node
342 .get_locking_blobs(blob_ids, chain_id)
343 .await?
344 .ok_or_else(|| original_err.clone())?;
345 self.remote_node.send_pending_blobs(chain_id, blobs).await?;
346 }
347 Err(error) => {
348 self.sync_if_needed(
349 chain_id,
350 certificate.round,
351 certificate.block().header.height,
352 error,
353 )
354 .await?;
355 }
356 _ => return Ok(result?),
357 }
358 let result = self
359 .remote_node
360 .handle_validated_certificate(certificate)
361 .await;
362 if let Err(err) = &result {
363 self.warn_if_unexpected(err);
364 }
365 Ok(result?)
366 }
367
368 async fn request_timeout(
373 &mut self,
374 chain_id: ChainId,
375 round: Round,
376 height: BlockHeight,
377 ) -> Result<Box<ChainInfo>, chain_client::Error> {
378 let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
379 let result = self
380 .remote_node
381 .handle_chain_info_query(query.clone())
382 .await;
383 if let Err(err) = &result {
384 self.sync_if_needed(chain_id, round, height, err).await?;
385 self.warn_if_unexpected(err);
386 }
387 Ok(result?)
388 }
389
390 async fn sync_if_needed(
392 &mut self,
393 chain_id: ChainId,
394 round: Round,
395 height: BlockHeight,
396 error: &NodeError,
397 ) -> Result<(), chain_client::Error> {
398 let address = &self.remote_node.address();
399 match error {
400 NodeError::WrongRound(validator_round) if *validator_round > round => {
401 tracing::debug!(
402 address, %chain_id, %validator_round, %round,
403 "validator is at a higher round; synchronizing",
404 );
405 self.client
406 .synchronize_chain_state_from(&self.remote_node, chain_id)
407 .await?;
408 }
409 NodeError::UnexpectedBlockHeight {
410 expected_block_height,
411 found_block_height,
412 } if expected_block_height > found_block_height => {
413 tracing::debug!(
414 address,
415 %chain_id,
416 %expected_block_height,
417 %found_block_height,
418 "validator is at a higher height; synchronizing",
419 );
420 self.client
421 .synchronize_chain_state_from(&self.remote_node, chain_id)
422 .await?;
423 }
424 NodeError::WrongRound(validator_round) if *validator_round < round => {
425 tracing::debug!(
426 address, %chain_id, %validator_round, %round,
427 "validator is at a lower round; sending chain info",
428 );
429 self.send_chain_information(
430 chain_id,
431 height,
432 CrossChainMessageDelivery::NonBlocking,
433 None,
434 )
435 .await?;
436 }
437 NodeError::UnexpectedBlockHeight {
438 expected_block_height,
439 found_block_height,
440 } if expected_block_height < found_block_height => {
441 tracing::debug!(
442 address,
443 %chain_id,
444 %expected_block_height,
445 %found_block_height,
446 "Validator is at a lower height; sending chain info.",
447 );
448 self.send_chain_information(
449 chain_id,
450 height,
451 CrossChainMessageDelivery::NonBlocking,
452 None,
453 )
454 .await?;
455 }
456 NodeError::InactiveChain(inactive_chain_id) => {
457 tracing::debug!(
458 address,
459 chain_id = %inactive_chain_id,
460 "Validator has inactive chain; sending chain info.",
461 );
462 self.send_chain_information(
463 *inactive_chain_id,
464 height,
465 CrossChainMessageDelivery::NonBlocking,
466 None,
467 )
468 .await?;
469 }
470 _ => {}
471 }
472 Ok(())
473 }
474
475 async fn send_block_proposal(
476 &mut self,
477 proposal: Box<BlockProposal>,
478 mut blob_ids: Vec<BlobId>,
479 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
480 ) -> Result<Box<ChainInfo>, chain_client::Error> {
481 let chain_id = proposal.content.block.chain_id;
482 let mut sent_cross_chain_updates = BTreeMap::new();
483 let mut publisher_chain_ids_sent = BTreeSet::new();
484 let storage = self.client.local_node.storage_client();
485 loop {
486 let local_time = storage.clock().current_time();
487 match self
488 .remote_node
489 .handle_block_proposal(proposal.clone())
490 .await
491 {
492 Ok(info) => return Ok(info),
493 Err(NodeError::WrongRound(_round)) => {
494 tracing::debug!(
497 remote_node = self.remote_node.address(),
498 %chain_id,
499 "wrong round; sending chain to validator",
500 );
501 self.send_chain_information(
502 chain_id,
503 proposal.content.block.height,
504 CrossChainMessageDelivery::NonBlocking,
505 None,
506 )
507 .await?;
508 }
509 Err(NodeError::UnexpectedBlockHeight {
510 expected_block_height,
511 found_block_height,
512 }) if expected_block_height < found_block_height
513 && found_block_height == proposal.content.block.height =>
514 {
515 tracing::debug!(
516 remote_node = self.remote_node.address(),
517 %chain_id,
518 "wrong height; sending chain to validator",
519 );
520 self.send_chain_information(
522 chain_id,
523 found_block_height,
524 CrossChainMessageDelivery::NonBlocking,
525 None,
526 )
527 .await?;
528 }
529 Err(NodeError::MissingCrossChainUpdate {
530 chain_id,
531 origin,
532 height,
533 }) if chain_id == proposal.content.block.chain_id
534 && sent_cross_chain_updates
535 .get(&origin)
536 .is_none_or(|h| *h < height) =>
537 {
538 tracing::debug!(
539 remote_node = %self.remote_node.address(),
540 chain_id = %origin,
541 "Missing cross-chain update; sending chain to validator.",
542 );
543 sent_cross_chain_updates.insert(origin, height);
544 self.send_chain_information(
548 origin,
549 height.try_add_one()?,
550 CrossChainMessageDelivery::Blocking,
551 None,
552 )
553 .await?;
554 }
555 Err(NodeError::EventsNotFound(event_ids)) => {
556 let mut publisher_heights = BTreeMap::new();
557 let chain_ids = event_ids
558 .iter()
559 .map(|event_id| event_id.chain_id)
560 .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
561 .collect::<BTreeSet<_>>();
562 tracing::debug!(
563 remote_node = self.remote_node.address(),
564 ?chain_ids,
565 "missing events; sending chains to validator",
566 );
567 ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
568 for chain_id in chain_ids {
569 let height = self
570 .client
571 .local_node
572 .get_next_height_to_preprocess(chain_id)
573 .await?;
574 publisher_heights.insert(chain_id, height);
575 publisher_chain_ids_sent.insert(chain_id);
576 }
577 self.send_chain_info_up_to_heights(
578 publisher_heights,
579 CrossChainMessageDelivery::NonBlocking,
580 )
581 .await?;
582 }
583 Err(error @ NodeError::ChainError { .. }) => {
584 self.warn_if_unexpected(&error);
593 tracing::debug!(
594 remote_node = self.remote_node.address(),
595 %chain_id,
596 %error,
597 "validator rejected proposal; pulling manager state",
598 );
599 if let Err(sync_err) = self
600 .client
601 .synchronize_chain_state_from(&self.remote_node, chain_id)
602 .await
603 {
604 tracing::debug!(%sync_err, "failed to pull manager state from validator");
605 }
606 return Err(error.into());
607 }
608 Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
609 if !blob_ids.is_empty() =>
610 {
611 tracing::debug!("Missing blobs");
612 let published_blob_ids =
616 BTreeSet::from_iter(proposal.content.block.published_blob_ids());
617 blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
618 let published_blobs = self
619 .client
620 .local_node
621 .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
622 .await?;
623 self.remote_node
624 .send_pending_blobs(chain_id, published_blobs)
625 .await?;
626 let missing_blob_ids = self
627 .remote_node
628 .node
629 .missing_blob_ids(mem::take(&mut blob_ids))
630 .await?;
631
632 tracing::debug!("Sending chains for missing blobs");
633 self.send_chain_info_for_blobs(
634 &missing_blob_ids,
635 CrossChainMessageDelivery::NonBlocking,
636 )
637 .await?;
638 }
639 Err(NodeError::InvalidTimestamp {
640 block_timestamp,
641 local_time: validator_local_time,
642 ..
643 }) => {
644 let clock_skew = local_time.delta_since(validator_local_time);
651 tracing::debug!(
652 remote_node = self.remote_node.address(),
653 %chain_id,
654 %block_timestamp,
655 ?clock_skew,
656 "validator's clock is behind; waiting and retrying",
657 );
658 clock_skew_sender
661 .send((self.remote_node.public_key, clock_skew))
662 .ok();
663 storage
664 .clock()
665 .sleep_until(block_timestamp.saturating_add(clock_skew))
666 .await;
667 }
668 Err(err) => {
670 self.warn_if_unexpected(&err);
671 return Err(err.into());
672 }
673 }
674 }
675 }
676
677 async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
678 let local_admin_info = self
679 .client
680 .local_node
681 .chain_info(self.admin_chain_id)
682 .await?;
683 Box::pin(self.send_chain_information(
684 self.admin_chain_id,
685 local_admin_info.next_block_height,
686 CrossChainMessageDelivery::NonBlocking,
687 None,
688 ))
689 .await
690 }
691
692 #[instrument(level = "debug", skip_all, fields(%chain_id))]
722 pub async fn send_chain_information(
723 &mut self,
724 chain_id: ChainId,
725 target_block_height: BlockHeight,
726 delivery: CrossChainMessageDelivery,
727 latest_certificate: Option<CacheArc<GenericCertificate<ConfirmedBlock>>>,
728 ) -> Result<(), chain_client::Error> {
729 let info = if target_block_height.0 > 0 {
731 self.sync_chain_height(chain_id, target_block_height, delivery, latest_certificate)
732 .await?
733 } else {
734 self.initialize_new_chain_on_validator(chain_id).await?
735 };
736
737 let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
741 let query = ChainInfoQuery::new(chain_id).with_manager_values();
742 let local_info = match self.client.local_node.handle_chain_info_query(query).await {
743 Ok(response) => response.info,
744 Err(LocalNodeError::BlobsNotFound(_)) => {
748 tracing::debug!("local chain description not fully available, skipping round sync");
749 return Ok(());
750 }
751 Err(error) => return Err(error.into()),
752 };
753
754 let manager = local_info.manager;
755 if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
756 return Ok(());
757 }
758
759 self.sync_consensus_round(remote_round, &manager).await
761 }
762
763 async fn sync_chain_height(
769 &mut self,
770 chain_id: ChainId,
771 target_block_height: BlockHeight,
772 delivery: CrossChainMessageDelivery,
773 latest_certificate: Option<CacheArc<GenericCertificate<ConfirmedBlock>>>,
774 ) -> Result<Box<ChainInfo>, chain_client::Error> {
775 let height = target_block_height.try_sub_one()?;
776
777 let certificate = if let Some(cert) = latest_certificate {
779 cert
780 } else {
781 self.read_certificates_for_heights(chain_id, vec![height])
782 .await?
783 .into_iter()
784 .next()
785 .ok_or_else(|| {
786 chain_client::Error::InternalError(
787 "failed to read latest certificate for height sync",
788 )
789 })?
790 };
791
792 let info = match self
794 .send_confirmed_certificate(&certificate, delivery)
795 .await
796 {
797 Ok(info) => info,
798 Err(error) => {
799 tracing::debug!(
800 address = self.remote_node.address(), %error,
801 "validator failed to handle confirmed certificate; sending whole chain",
802 );
803 let query = ChainInfoQuery::new(chain_id);
804 self.remote_node.handle_chain_info_query(query).await?
805 }
806 };
807
808 let info = self
811 .push_checkpoint_if_useful(chain_id, info, delivery)
812 .await?;
813
814 let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
816 .map(BlockHeight)
817 .collect();
818
819 if heights.is_empty() {
820 return Ok(info);
821 }
822
823 let batch_size = self.client.options().certificate_upload_batch_size;
824 for chunk in heights.chunks(batch_size) {
825 let certificates = self
826 .read_certificates_for_heights(chain_id, chunk.to_vec())
827 .await?;
828
829 for certificate in certificates {
830 self.send_confirmed_certificate(&certificate, delivery)
831 .await?;
832 }
833 }
834
835 Ok(info)
836 }
837
838 async fn read_certificates_for_heights(
840 &self,
841 chain_id: ChainId,
842 heights: Vec<BlockHeight>,
843 ) -> Result<Vec<CacheArc<GenericCertificate<ConfirmedBlock>>>, chain_client::Error> {
844 let storage = self.client.local_node.storage_client();
845
846 let certificates_by_height = storage
847 .read_certificates_by_heights(chain_id, &heights)
848 .await?;
849
850 Ok(certificates_by_height.into_iter().flatten().collect())
851 }
852
853 async fn push_checkpoint_if_useful(
861 &mut self,
862 chain_id: ChainId,
863 info: Box<ChainInfo>,
864 delivery: CrossChainMessageDelivery,
865 ) -> Result<Box<ChainInfo>, chain_client::Error> {
866 let local_query = ChainInfoQuery::new(chain_id).with_latest_checkpoint_height();
867 let local_info = self
868 .client
869 .local_node
870 .handle_chain_info_query(local_query)
871 .await?
872 .info;
873 let Some(checkpoint_height) = local_info.requested_latest_checkpoint_height else {
874 return Ok(info);
875 };
876 if checkpoint_height < info.next_block_height {
877 return Ok(info);
878 }
879 let Some(checkpoint_cert) = self
880 .read_certificates_for_heights(chain_id, vec![checkpoint_height])
881 .await?
882 .into_iter()
883 .next()
884 else {
885 return Ok(info);
886 };
887 self.send_confirmed_certificate(&checkpoint_cert, delivery)
888 .await
889 }
890
891 async fn initialize_new_chain_on_validator(
897 &self,
898 chain_id: ChainId,
899 ) -> Result<Box<ChainInfo>, chain_client::Error> {
900 self.send_chain_info_for_blobs(
902 &[BlobId::new(chain_id.0, BlobType::ChainDescription)],
903 CrossChainMessageDelivery::NonBlocking,
904 )
905 .await?;
906
907 let query = ChainInfoQuery::new(chain_id);
909 let info = self.remote_node.handle_chain_info_query(query).await?;
910 Ok(info)
911 }
912
913 async fn sync_consensus_round(
920 &self,
921 remote_round: Round,
922 manager: &linera_chain::manager::ChainManagerInfo,
923 ) -> Result<(), chain_client::Error> {
924 let target_round = manager.current_round;
925
926 if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.as_deref() {
931 if validated.round == target_round {
932 match self
933 .remote_node
934 .handle_optimized_validated_certificate(
935 validated,
936 CrossChainMessageDelivery::NonBlocking,
937 )
938 .await
939 {
940 Ok(info) => {
941 tracing::debug!("successfully sent validated block for round sync");
942 if info.manager.current_round >= target_round {
943 return Ok(());
944 }
945 }
946 Err(error) => {
947 tracing::debug!(%error, "failed to send validated block");
948 }
949 }
950 }
951 }
952
953 if let Some(cert) = &manager.timeout {
956 if cert.round >= remote_round {
957 match self
958 .remote_node
959 .handle_timeout_certificate(cert.as_ref().clone())
960 .await
961 {
962 Ok(info) => {
963 tracing::debug!(round = %cert.round, "successfully sent timeout certificate");
964 if info.manager.current_round >= target_round {
965 return Ok(());
966 }
967 }
968 Err(error) => {
969 tracing::debug!(%error, round = %cert.round, "failed to send timeout certificate");
970 }
971 }
972 }
973 }
974
975 for proposal in manager
977 .requested_proposed
978 .iter()
979 .chain(manager.requested_signed_proposal.iter())
980 {
981 if proposal.content.round == target_round {
982 match self
983 .remote_node
984 .handle_block_proposal(proposal.clone())
985 .await
986 {
987 Ok(info) => {
988 tracing::debug!("successfully sent block proposal for round sync");
989 if info.manager.current_round >= target_round {
990 return Ok(());
991 }
992 }
993 Err(error) => {
994 tracing::debug!(%error, "failed to send block proposal");
995 }
996 }
997 }
998 }
999
1000 tracing::debug!("round sync not performed: no applicable data or all attempts failed");
1003 Ok(())
1004 }
1005
1006 async fn send_chain_info_for_blobs(
1012 &self,
1013 blob_ids: &[BlobId],
1014 delivery: CrossChainMessageDelivery,
1015 ) -> Result<(), chain_client::Error> {
1016 let blob_states = self
1017 .client
1018 .local_node
1019 .read_blob_states_from_storage(blob_ids)
1020 .await?;
1021
1022 let mut chain_heights: BTreeMap<ChainId, BTreeSet<BlockHeight>> = BTreeMap::new();
1023 for blob_state in blob_states {
1024 match blob_state.origin {
1025 BlobOrigin::Genesis => continue,
1028 BlobOrigin::Published {
1029 chain_id,
1030 block_height,
1031 } => {
1032 chain_heights
1033 .entry(chain_id)
1034 .or_default()
1035 .insert(block_height);
1036 }
1037 }
1038 }
1039
1040 self.send_chain_info_at_heights(chain_heights, delivery)
1041 .await
1042 }
1043
1044 async fn send_chain_info_at_heights(
1050 &self,
1051 chain_heights: impl IntoIterator<Item = (ChainId, BTreeSet<BlockHeight>)>,
1052 delivery: CrossChainMessageDelivery,
1053 ) -> Result<(), chain_client::Error> {
1054 future::try_join_all(chain_heights.into_iter().map(|(chain_id, heights)| {
1055 let mut updater = self.clone();
1056 async move {
1057 let heights_vec = heights.into_iter().collect::<Vec<_>>();
1059 let certificates = updater
1060 .client
1061 .local_node
1062 .storage_client()
1063 .read_certificates_by_heights(chain_id, &heights_vec)
1064 .await?
1065 .into_iter()
1066 .flatten()
1067 .collect::<Vec<_>>();
1068
1069 for certificate in certificates {
1071 updater
1072 .send_confirmed_certificate(&certificate, delivery)
1073 .await?;
1074 }
1075
1076 Ok::<_, chain_client::Error>(())
1077 }
1078 }))
1079 .await?;
1080 Ok(())
1081 }
1082
1083 async fn send_chain_info_up_to_heights(
1084 &self,
1085 chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
1086 delivery: CrossChainMessageDelivery,
1087 ) -> Result<(), chain_client::Error> {
1088 future::try_join_all(chain_heights.into_iter().map(|(chain_id, height)| {
1089 let mut updater = self.clone();
1090 async move {
1091 updater
1092 .send_chain_information(chain_id, height, delivery, None)
1093 .await
1094 }
1095 }))
1096 .await?;
1097 Ok(())
1098 }
1099
1100 pub async fn send_chain_update(
1101 &mut self,
1102 action: CommunicateAction,
1103 ) -> Result<LiteVote, chain_client::Error> {
1104 let chain_id = match &action {
1105 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
1106 CommunicateAction::FinalizeBlock { certificate, .. } => {
1107 certificate.inner().block().header.chain_id
1108 }
1109 CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
1110 };
1111 let vote = match action {
1113 CommunicateAction::SubmitBlock {
1114 proposal,
1115 blob_ids,
1116 clock_skew_sender,
1117 } => {
1118 let info = self
1119 .send_block_proposal(proposal, blob_ids, clock_skew_sender)
1120 .await?;
1121 info.manager.pending.ok_or_else(|| {
1122 NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
1123 })?
1124 }
1125 CommunicateAction::FinalizeBlock {
1126 certificate,
1127 delivery,
1128 } => {
1129 let info = self
1130 .send_validated_certificate(*certificate, delivery)
1131 .await?;
1132 info.manager.pending.ok_or_else(|| {
1133 NodeError::MissingVoteInValidatorResponse("finalize a block".into())
1134 })?
1135 }
1136 CommunicateAction::RequestTimeout { round, height, .. } => {
1137 let info = self.request_timeout(chain_id, round, height).await?;
1138 info.manager.timeout_vote.ok_or_else(|| {
1139 NodeError::MissingVoteInValidatorResponse("request a timeout".into())
1140 })?
1141 }
1142 };
1143 vote.check(self.remote_node.public_key)?;
1144 Ok(vote)
1145 }
1146}