1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt,
8 hash::Hash,
9 mem,
10 sync::Arc,
11};
12
13use futures::{
14 stream::{FuturesUnordered, TryStreamExt},
15 Future, StreamExt,
16};
17use linera_base::{
18 crypto::ValidatorPublicKey,
19 data_types::{BlockHeight, Round, TimeDelta},
20 ensure,
21 identifiers::{BlobId, BlobType, ChainId, StreamId},
22 time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25 data_types::{BlockProposal, LiteVote},
26 manager::LockingBlock,
27 types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{Clock, Storage};
31use thiserror::Error;
32use tokio::sync::mpsc;
33use tracing::{instrument, Level};
34
35use crate::{
36 client::{chain_client, Client},
37 data_types::{ChainInfo, ChainInfoQuery},
38 environment::Environment,
39 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
40 remote_node::RemoteNode,
41 LocalNodeError,
42};
43
44pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
47
48pub type ClockSkewReport = (ValidatorPublicKey, TimeDelta);
50const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
55pub enum CommunicateAction {
56 SubmitBlock {
57 proposal: Box<BlockProposal>,
58 blob_ids: Vec<BlobId>,
59 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
61 },
62 FinalizeBlock {
63 certificate: Box<ValidatedBlockCertificate>,
64 delivery: CrossChainMessageDelivery,
65 },
66 RequestTimeout {
67 chain_id: ChainId,
68 height: BlockHeight,
69 round: Round,
70 },
71}
72
73impl CommunicateAction {
74 pub fn round(&self) -> Round {
76 match self {
77 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
78 CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
79 CommunicateAction::RequestTimeout { round, .. } => *round,
80 }
81 }
82}
83
84pub struct ValidatorUpdater<Env>
85where
86 Env: Environment,
87{
88 pub remote_node: RemoteNode<Env::ValidatorNode>,
89 pub client: Arc<Client<Env>>,
90 pub admin_chain_id: ChainId,
91}
92
93impl<Env: Environment> Clone for ValidatorUpdater<Env> {
94 fn clone(&self) -> Self {
95 ValidatorUpdater {
96 remote_node: self.remote_node.clone(),
97 client: self.client.clone(),
98 admin_chain_id: self.admin_chain_id,
99 }
100 }
101}
102
103#[derive(Error, Debug)]
105pub enum CommunicationError<E: fmt::Debug> {
106 #[error(
109 "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
110 )]
111 NoConsensus(u64, Vec<(u64, usize)>),
112 #[error("Failed to communicate with a quorum of validators: {0}")]
115 Trusted(E),
116 #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
119 Sample(Vec<(E, u64)>),
120}
121
122pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
129 validator_clients: &'a [RemoteNode<A>],
130 committee: &Committee,
131 group_by: G,
132 execute: F,
133 quorum_grace_period: f64,
135) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
136where
137 A: ValidatorNode + Clone + 'static,
138 F: Clone + Fn(RemoteNode<A>) -> R,
139 R: Future<Output = Result<V, chain_client::Error>> + 'a,
140 G: Fn(&V) -> K,
141 K: Hash + PartialEq + Eq + Clone + 'static,
142 V: 'static,
143{
144 let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
145 .iter()
146 .filter_map(|remote_node| {
147 if committee.weight(&remote_node.public_key) == 0 {
148 return None;
151 }
152 let execute = execute.clone();
153 let remote_node = remote_node.clone();
154 Some(async move { (remote_node.public_key, execute(remote_node).await) })
155 })
156 .collect();
157
158 let start_time = Instant::now();
159 let mut end_time: Option<Instant> = None;
160 let mut remaining_votes = committee.total_votes();
161 let mut highest_key_score = 0;
162 let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
163 let mut error_scores = HashMap::new();
164
165 'vote_wait: while let Ok(Some((name, result))) = timeout(
166 end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
167 responses.next(),
168 )
169 .await
170 {
171 remaining_votes -= committee.weight(&name);
172 match result {
173 Ok(value) => {
174 let key = group_by(&value);
175 let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
176 entry.0 += committee.weight(&name);
177 entry.1.push((name, value));
178 highest_key_score = highest_key_score.max(entry.0);
179 }
180 Err(err) => {
181 let err = match err {
183 chain_client::Error::RemoteNodeError(err) => err,
184 err => NodeError::ResponseHandlingError {
185 error: err.to_string(),
186 },
187 };
188 let entry = error_scores.entry(err.clone()).or_insert(0);
189 *entry += committee.weight(&name);
190 }
191 }
192 if highest_key_score + remaining_votes < committee.quorum_threshold() {
194 break 'vote_wait;
195 }
196
197 if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
200 end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
201 }
202 }
203
204 let scores = value_scores
205 .values()
206 .map(|(weight, values)| (*weight, values.len()))
207 .collect();
208 if let Some((key, (_, values))) = value_scores
210 .into_iter()
211 .find(|(_, (score, _))| *score >= committee.quorum_threshold())
212 {
213 return Ok((key, values));
214 }
215
216 let mut sample = error_scores.into_iter().collect::<Vec<_>>();
217 sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
218 sample.truncate(4);
219 Err(match sample.as_slice() {
220 [] => CommunicationError::NoConsensus(committee.quorum_threshold(), scores),
221 [(_, score), ..] if *score >= committee.validity_threshold() => {
222 CommunicationError::Trusted(sample.into_iter().next().unwrap().0)
224 }
225 _ => CommunicationError::Sample(sample),
227 })
228}
229
230impl<Env> ValidatorUpdater<Env>
231where
232 Env: Environment + 'static,
233{
234 fn warn_if_unexpected(&self, err: &NodeError) {
236 if !err.is_expected() {
237 tracing::warn!(
238 remote_node = self.remote_node.address(),
239 %err,
240 "unexpected error from validator",
241 );
242 }
243 }
244
245 #[instrument(
246 level = "trace", skip_all, err(level = Level::DEBUG),
247 fields(chain_id = %certificate.block().header.chain_id)
248 )]
249 async fn send_confirmed_certificate(
250 &mut self,
251 certificate: &Arc<GenericCertificate<ConfirmedBlock>>,
252 delivery: CrossChainMessageDelivery,
253 ) -> Result<Box<ChainInfo>, chain_client::Error> {
254 let mut result = self
255 .remote_node
256 .handle_optimized_confirmed_certificate(certificate, delivery)
257 .await;
258
259 let mut sent_admin_chain = false;
260 let mut sent_blobs = false;
261 loop {
262 match result {
263 Err(NodeError::EventsNotFound(event_ids))
264 if !sent_admin_chain
265 && certificate.inner().chain_id() != self.admin_chain_id
266 && event_ids.iter().all(|event_id| {
267 event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
268 && event_id.chain_id == self.admin_chain_id
269 }) =>
270 {
271 self.update_admin_chain().await?;
273 sent_admin_chain = true;
274 }
275 Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
276 self.remote_node
278 .check_blobs_not_found(certificate, &blob_ids)?;
279 let maybe_blobs = self
281 .client
282 .local_node
283 .read_blobs_from_storage(&blob_ids)
284 .await?;
285 let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
286 self.remote_node.node.upload_blobs(blobs).await?;
287 sent_blobs = true;
288 }
289 result => {
290 if let Err(err) = &result {
291 self.warn_if_unexpected(err);
292 }
293 return Ok(result?);
294 }
295 }
296 result = self
297 .remote_node
298 .handle_confirmed_certificate(certificate.clone(), delivery)
299 .await;
300 }
301 }
302
303 async fn send_validated_certificate(
304 &mut self,
305 certificate: GenericCertificate<ValidatedBlock>,
306 delivery: CrossChainMessageDelivery,
307 ) -> Result<Box<ChainInfo>, chain_client::Error> {
308 let result = self
309 .remote_node
310 .handle_optimized_validated_certificate(&certificate, delivery)
311 .await;
312
313 let chain_id = certificate.inner().chain_id();
314 match &result {
315 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
316 self.remote_node
317 .check_blobs_not_found(&certificate, blob_ids)?;
318 let blobs = self
321 .client
322 .local_node
323 .get_locking_blobs(blob_ids, chain_id)
324 .await?
325 .ok_or_else(|| original_err.clone())?;
326 self.remote_node.send_pending_blobs(chain_id, blobs).await?;
327 }
328 Err(error) => {
329 self.sync_if_needed(
330 chain_id,
331 certificate.round,
332 certificate.block().header.height,
333 error,
334 )
335 .await?;
336 }
337 _ => return Ok(result?),
338 }
339 let result = self
340 .remote_node
341 .handle_validated_certificate(certificate)
342 .await;
343 if let Err(err) = &result {
344 self.warn_if_unexpected(err);
345 }
346 Ok(result?)
347 }
348
349 async fn request_timeout(
354 &mut self,
355 chain_id: ChainId,
356 round: Round,
357 height: BlockHeight,
358 ) -> Result<Box<ChainInfo>, chain_client::Error> {
359 let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
360 let result = self
361 .remote_node
362 .handle_chain_info_query(query.clone())
363 .await;
364 if let Err(err) = &result {
365 self.sync_if_needed(chain_id, round, height, err).await?;
366 self.warn_if_unexpected(err);
367 }
368 Ok(result?)
369 }
370
371 async fn sync_if_needed(
373 &mut self,
374 chain_id: ChainId,
375 round: Round,
376 height: BlockHeight,
377 error: &NodeError,
378 ) -> Result<(), chain_client::Error> {
379 let address = &self.remote_node.address();
380 match error {
381 NodeError::WrongRound(validator_round) if *validator_round > round => {
382 tracing::debug!(
383 address, %chain_id, %validator_round, %round,
384 "validator is at a higher round; synchronizing",
385 );
386 self.client
387 .synchronize_chain_state_from(&self.remote_node, chain_id)
388 .await?;
389 }
390 NodeError::UnexpectedBlockHeight {
391 expected_block_height,
392 found_block_height,
393 } if expected_block_height > found_block_height => {
394 tracing::debug!(
395 address,
396 %chain_id,
397 %expected_block_height,
398 %found_block_height,
399 "validator is at a higher height; synchronizing",
400 );
401 self.client
402 .synchronize_chain_state_from(&self.remote_node, chain_id)
403 .await?;
404 }
405 NodeError::WrongRound(validator_round) if *validator_round < round => {
406 tracing::debug!(
407 address, %chain_id, %validator_round, %round,
408 "validator is at a lower round; sending chain info",
409 );
410 self.send_chain_information(
411 chain_id,
412 height,
413 CrossChainMessageDelivery::NonBlocking,
414 None,
415 )
416 .await?;
417 }
418 NodeError::UnexpectedBlockHeight {
419 expected_block_height,
420 found_block_height,
421 } if expected_block_height < found_block_height => {
422 tracing::debug!(
423 address,
424 %chain_id,
425 %expected_block_height,
426 %found_block_height,
427 "Validator is at a lower height; sending chain info.",
428 );
429 self.send_chain_information(
430 chain_id,
431 height,
432 CrossChainMessageDelivery::NonBlocking,
433 None,
434 )
435 .await?;
436 }
437 NodeError::InactiveChain(inactive_chain_id) => {
438 tracing::debug!(
439 address,
440 chain_id = %inactive_chain_id,
441 "Validator has inactive chain; sending chain info.",
442 );
443 self.send_chain_information(
444 *inactive_chain_id,
445 height,
446 CrossChainMessageDelivery::NonBlocking,
447 None,
448 )
449 .await?;
450 }
451 _ => {}
452 }
453 Ok(())
454 }
455
456 async fn send_block_proposal(
457 &mut self,
458 proposal: Box<BlockProposal>,
459 mut blob_ids: Vec<BlobId>,
460 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
461 ) -> Result<Box<ChainInfo>, chain_client::Error> {
462 let chain_id = proposal.content.block.chain_id;
463 let mut sent_cross_chain_updates = BTreeMap::new();
464 let mut publisher_chain_ids_sent = BTreeSet::new();
465 let storage = self.client.local_node.storage_client();
466 loop {
467 let local_time = storage.clock().current_time();
468 match self
469 .remote_node
470 .handle_block_proposal(proposal.clone())
471 .await
472 {
473 Ok(info) => return Ok(info),
474 Err(NodeError::WrongRound(_round)) => {
475 tracing::debug!(
478 remote_node = self.remote_node.address(),
479 %chain_id,
480 "wrong round; sending chain to validator",
481 );
482 self.send_chain_information(
483 chain_id,
484 proposal.content.block.height,
485 CrossChainMessageDelivery::NonBlocking,
486 None,
487 )
488 .await?;
489 }
490 Err(NodeError::UnexpectedBlockHeight {
491 expected_block_height,
492 found_block_height,
493 }) if expected_block_height < found_block_height
494 && found_block_height == proposal.content.block.height =>
495 {
496 tracing::debug!(
497 remote_node = self.remote_node.address(),
498 %chain_id,
499 "wrong height; sending chain to validator",
500 );
501 self.send_chain_information(
503 chain_id,
504 found_block_height,
505 CrossChainMessageDelivery::NonBlocking,
506 None,
507 )
508 .await?;
509 }
510 Err(NodeError::MissingCrossChainUpdate {
511 chain_id,
512 origin,
513 height,
514 }) if chain_id == proposal.content.block.chain_id
515 && sent_cross_chain_updates
516 .get(&origin)
517 .is_none_or(|h| *h < height) =>
518 {
519 tracing::debug!(
520 remote_node = %self.remote_node.address(),
521 chain_id = %origin,
522 "Missing cross-chain update; sending chain to validator.",
523 );
524 sent_cross_chain_updates.insert(origin, height);
525 self.send_chain_information(
529 origin,
530 height.try_add_one()?,
531 CrossChainMessageDelivery::Blocking,
532 None,
533 )
534 .await?;
535 }
536 Err(NodeError::EventsNotFound(event_ids)) => {
537 let mut publisher_heights = BTreeMap::new();
538 let chain_ids = event_ids
539 .iter()
540 .map(|event_id| event_id.chain_id)
541 .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
542 .collect::<BTreeSet<_>>();
543 tracing::debug!(
544 remote_node = self.remote_node.address(),
545 ?chain_ids,
546 "missing events; sending chains to validator",
547 );
548 ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
549 for chain_id in chain_ids {
550 let height = self
551 .client
552 .local_node
553 .get_next_height_to_preprocess(chain_id)
554 .await?;
555 publisher_heights.insert(chain_id, height);
556 publisher_chain_ids_sent.insert(chain_id);
557 }
558 self.send_chain_info_up_to_heights(
559 publisher_heights,
560 CrossChainMessageDelivery::NonBlocking,
561 )
562 .await?;
563 }
564 Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
565 if !blob_ids.is_empty() =>
566 {
567 tracing::debug!("Missing blobs");
568 let published_blob_ids =
572 BTreeSet::from_iter(proposal.content.block.published_blob_ids());
573 blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
574 let published_blobs = self
575 .client
576 .local_node
577 .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
578 .await?;
579 self.remote_node
580 .send_pending_blobs(chain_id, published_blobs)
581 .await?;
582 let missing_blob_ids = self
583 .remote_node
584 .node
585 .missing_blob_ids(mem::take(&mut blob_ids))
586 .await?;
587
588 tracing::debug!("Sending chains for missing blobs");
589 self.send_chain_info_for_blobs(
590 &missing_blob_ids,
591 CrossChainMessageDelivery::NonBlocking,
592 )
593 .await?;
594 }
595 Err(NodeError::InvalidTimestamp {
596 block_timestamp,
597 local_time: validator_local_time,
598 ..
599 }) => {
600 let clock_skew = local_time.delta_since(validator_local_time);
607 tracing::debug!(
608 remote_node = self.remote_node.address(),
609 %chain_id,
610 %block_timestamp,
611 ?clock_skew,
612 "validator's clock is behind; waiting and retrying",
613 );
614 clock_skew_sender
617 .send((self.remote_node.public_key, clock_skew))
618 .ok();
619 storage
620 .clock()
621 .sleep_until(block_timestamp.saturating_add(clock_skew))
622 .await;
623 }
624 Err(err) => {
626 self.warn_if_unexpected(&err);
627 return Err(err.into());
628 }
629 }
630 }
631 }
632
633 async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
634 let local_admin_info = self
635 .client
636 .local_node
637 .chain_info(self.admin_chain_id)
638 .await?;
639 Box::pin(self.send_chain_information(
640 self.admin_chain_id,
641 local_admin_info.next_block_height,
642 CrossChainMessageDelivery::NonBlocking,
643 None,
644 ))
645 .await
646 }
647
648 #[instrument(level = "debug", skip_all, fields(%chain_id))]
678 pub async fn send_chain_information(
679 &mut self,
680 chain_id: ChainId,
681 target_block_height: BlockHeight,
682 delivery: CrossChainMessageDelivery,
683 latest_certificate: Option<Arc<GenericCertificate<ConfirmedBlock>>>,
684 ) -> Result<(), chain_client::Error> {
685 let info = if target_block_height.0 > 0 {
687 self.sync_chain_height(chain_id, target_block_height, delivery, latest_certificate)
688 .await?
689 } else {
690 self.initialize_new_chain_on_validator(chain_id).await?
691 };
692
693 let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
697 let query = ChainInfoQuery::new(chain_id).with_manager_values();
698 let local_info = match self.client.local_node.handle_chain_info_query(query).await {
699 Ok(response) => response.info,
700 Err(LocalNodeError::BlobsNotFound(_)) => {
704 tracing::debug!("local chain description not fully available, skipping round sync");
705 return Ok(());
706 }
707 Err(error) => return Err(error.into()),
708 };
709
710 let manager = local_info.manager;
711 if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
712 return Ok(());
713 }
714
715 self.sync_consensus_round(remote_round, &manager).await
717 }
718
719 async fn sync_chain_height(
725 &mut self,
726 chain_id: ChainId,
727 target_block_height: BlockHeight,
728 delivery: CrossChainMessageDelivery,
729 latest_certificate: Option<Arc<GenericCertificate<ConfirmedBlock>>>,
730 ) -> Result<Box<ChainInfo>, chain_client::Error> {
731 let height = target_block_height.try_sub_one()?;
732
733 let certificate = if let Some(cert) = latest_certificate {
735 cert
736 } else {
737 self.read_certificates_for_heights(chain_id, vec![height])
738 .await?
739 .into_iter()
740 .next()
741 .ok_or_else(|| {
742 chain_client::Error::InternalError(
743 "failed to read latest certificate for height sync",
744 )
745 })?
746 };
747
748 let info = match self
750 .send_confirmed_certificate(&certificate, delivery)
751 .await
752 {
753 Ok(info) => info,
754 Err(error) => {
755 tracing::debug!(
756 address = self.remote_node.address(), %error,
757 "validator failed to handle confirmed certificate; sending whole chain",
758 );
759 let query = ChainInfoQuery::new(chain_id);
760 self.remote_node.handle_chain_info_query(query).await?
761 }
762 };
763
764 let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
766 .map(BlockHeight)
767 .collect();
768
769 if heights.is_empty() {
770 return Ok(info);
771 }
772
773 let batch_size = self.client.options().certificate_upload_batch_size;
774 for chunk in heights.chunks(batch_size) {
775 let certificates = self
776 .read_certificates_for_heights(chain_id, chunk.to_vec())
777 .await?;
778
779 for certificate in certificates {
780 self.send_confirmed_certificate(&certificate, delivery)
781 .await?;
782 }
783 }
784
785 Ok(info)
786 }
787
788 async fn read_certificates_for_heights(
790 &self,
791 chain_id: ChainId,
792 heights: Vec<BlockHeight>,
793 ) -> Result<Vec<Arc<GenericCertificate<ConfirmedBlock>>>, chain_client::Error> {
794 let storage = self.client.local_node.storage_client();
795
796 let certificates_by_height = storage
797 .read_certificates_by_heights(chain_id, &heights)
798 .await?;
799
800 Ok(certificates_by_height.into_iter().flatten().collect())
801 }
802
803 async fn initialize_new_chain_on_validator(
809 &self,
810 chain_id: ChainId,
811 ) -> Result<Box<ChainInfo>, chain_client::Error> {
812 self.send_chain_info_for_blobs(
814 &[BlobId::new(chain_id.0, BlobType::ChainDescription)],
815 CrossChainMessageDelivery::NonBlocking,
816 )
817 .await?;
818
819 let query = ChainInfoQuery::new(chain_id);
821 let info = self.remote_node.handle_chain_info_query(query).await?;
822 Ok(info)
823 }
824
825 async fn sync_consensus_round(
832 &self,
833 remote_round: Round,
834 manager: &linera_chain::manager::ChainManagerInfo,
835 ) -> Result<(), chain_client::Error> {
836 for proposal in manager
838 .requested_proposed
839 .iter()
840 .chain(manager.requested_signed_proposal.iter())
841 {
842 if proposal.content.round == manager.current_round {
843 match self
844 .remote_node
845 .handle_block_proposal(proposal.clone())
846 .await
847 {
848 Ok(_) => {
849 tracing::debug!("successfully sent block proposal for round sync");
850 return Ok(());
851 }
852 Err(error) => {
853 tracing::debug!(%error, "failed to send block proposal");
854 }
855 }
856 }
857 }
858
859 if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.as_deref() {
861 if validated.round == manager.current_round {
862 match self
863 .remote_node
864 .handle_optimized_validated_certificate(
865 validated,
866 CrossChainMessageDelivery::NonBlocking,
867 )
868 .await
869 {
870 Ok(_) => {
871 tracing::debug!("successfully sent validated block for round sync");
872 return Ok(());
873 }
874 Err(error) => {
875 tracing::debug!(%error, "failed to send validated block");
876 }
877 }
878 }
879 }
880
881 if let Some(cert) = &manager.timeout {
883 if cert.round >= remote_round {
884 match self
885 .remote_node
886 .handle_timeout_certificate(cert.as_ref().clone())
887 .await
888 {
889 Ok(_) => {
890 tracing::debug!(round = %cert.round, "successfully sent timeout certificate");
891 return Ok(());
892 }
893 Err(error) => {
894 tracing::debug!(%error, round = %cert.round, "failed to send timeout certificate");
895 }
896 }
897 }
898 }
899
900 tracing::debug!("round sync not performed: no applicable data or all attempts failed");
903 Ok(())
904 }
905
906 async fn send_chain_info_for_blobs(
912 &self,
913 blob_ids: &[BlobId],
914 delivery: CrossChainMessageDelivery,
915 ) -> Result<(), chain_client::Error> {
916 let blob_states = self
917 .client
918 .local_node
919 .read_blob_states_from_storage(blob_ids)
920 .await?;
921
922 let mut chain_heights: BTreeMap<ChainId, BTreeSet<BlockHeight>> = BTreeMap::new();
923 for blob_state in blob_states {
924 let block_chain_id = blob_state.chain_id;
925 let block_height = blob_state.block_height;
926 chain_heights
927 .entry(block_chain_id)
928 .or_default()
929 .insert(block_height);
930 }
931
932 self.send_chain_info_at_heights(chain_heights, delivery)
933 .await
934 }
935
936 async fn send_chain_info_at_heights(
942 &self,
943 chain_heights: impl IntoIterator<Item = (ChainId, BTreeSet<BlockHeight>)>,
944 delivery: CrossChainMessageDelivery,
945 ) -> Result<(), chain_client::Error> {
946 chain_heights
947 .into_iter()
948 .map(|(chain_id, heights)| {
949 let mut updater = self.clone();
950 async move {
951 let heights_vec: Vec<_> = heights.into_iter().collect();
953 let certificates = updater
954 .client
955 .local_node
956 .storage_client()
957 .read_certificates_by_heights(chain_id, &heights_vec)
958 .await?
959 .into_iter()
960 .flatten()
961 .collect::<Vec<_>>();
962
963 for certificate in certificates {
965 updater
966 .send_confirmed_certificate(&certificate, delivery)
967 .await?;
968 }
969
970 Ok::<_, chain_client::Error>(())
971 }
972 })
973 .collect::<FuturesUnordered<_>>()
974 .try_collect::<Vec<_>>()
975 .await?;
976 Ok(())
977 }
978
979 async fn send_chain_info_up_to_heights(
980 &self,
981 chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
982 delivery: CrossChainMessageDelivery,
983 ) -> Result<(), chain_client::Error> {
984 chain_heights
985 .into_iter()
986 .map(|(chain_id, height)| {
987 let mut updater = self.clone();
988 async move {
989 updater
990 .send_chain_information(chain_id, height, delivery, None)
991 .await
992 }
993 })
994 .collect::<FuturesUnordered<_>>()
995 .try_collect::<Vec<_>>()
996 .await?;
997 Ok(())
998 }
999
1000 pub async fn send_chain_update(
1001 &mut self,
1002 action: CommunicateAction,
1003 ) -> Result<LiteVote, chain_client::Error> {
1004 let chain_id = match &action {
1005 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
1006 CommunicateAction::FinalizeBlock { certificate, .. } => {
1007 certificate.inner().block().header.chain_id
1008 }
1009 CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
1010 };
1011 let vote = match action {
1013 CommunicateAction::SubmitBlock {
1014 proposal,
1015 blob_ids,
1016 clock_skew_sender,
1017 } => {
1018 let info = self
1019 .send_block_proposal(proposal, blob_ids, clock_skew_sender)
1020 .await?;
1021 info.manager.pending.ok_or_else(|| {
1022 NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
1023 })?
1024 }
1025 CommunicateAction::FinalizeBlock {
1026 certificate,
1027 delivery,
1028 } => {
1029 let info = self
1030 .send_validated_certificate(*certificate, delivery)
1031 .await?;
1032 info.manager.pending.ok_or_else(|| {
1033 NodeError::MissingVoteInValidatorResponse("finalize a block".into())
1034 })?
1035 }
1036 CommunicateAction::RequestTimeout { round, height, .. } => {
1037 let info = self.request_timeout(chain_id, round, height).await?;
1038 info.manager.timeout_vote.ok_or_else(|| {
1039 NodeError::MissingVoteInValidatorResponse("request a timeout".into())
1040 })?
1041 }
1042 };
1043 vote.check(self.remote_node.public_key)?;
1044 Ok(vote)
1045 }
1046}