1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt,
8 hash::Hash,
9 mem,
10 sync::Arc,
11};
12
13use futures::{
14 stream::{FuturesUnordered, TryStreamExt},
15 Future, StreamExt,
16};
17use linera_base::{
18 crypto::ValidatorPublicKey,
19 data_types::{BlockHeight, Round, TimeDelta},
20 ensure,
21 identifiers::{BlobId, BlobType, ChainId, StreamId},
22 time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25 data_types::{BlockProposal, LiteVote},
26 manager::LockingBlock,
27 types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{Clock, ResultReadCertificates, Storage};
31use thiserror::Error;
32use tokio::sync::mpsc;
33use tracing::{instrument, Level};
34
35use crate::{
36 client::{chain_client, Client},
37 data_types::{ChainInfo, ChainInfoQuery},
38 environment::Environment,
39 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
40 remote_node::RemoteNode,
41 LocalNodeError,
42};
43
44pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
47
48pub type ClockSkewReport = (ValidatorPublicKey, TimeDelta);
50const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
55pub enum CommunicateAction {
56 SubmitBlock {
57 proposal: Box<BlockProposal>,
58 blob_ids: Vec<BlobId>,
59 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
61 },
62 FinalizeBlock {
63 certificate: Box<ValidatedBlockCertificate>,
64 delivery: CrossChainMessageDelivery,
65 },
66 RequestTimeout {
67 chain_id: ChainId,
68 height: BlockHeight,
69 round: Round,
70 },
71}
72
73impl CommunicateAction {
74 pub fn round(&self) -> Round {
76 match self {
77 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
78 CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
79 CommunicateAction::RequestTimeout { round, .. } => *round,
80 }
81 }
82}
83
84pub struct ValidatorUpdater<Env>
85where
86 Env: Environment,
87{
88 pub remote_node: RemoteNode<Env::ValidatorNode>,
89 pub client: Arc<Client<Env>>,
90 pub admin_id: ChainId,
91}
92
93impl<Env: Environment> Clone for ValidatorUpdater<Env> {
94 fn clone(&self) -> Self {
95 ValidatorUpdater {
96 remote_node: self.remote_node.clone(),
97 client: self.client.clone(),
98 admin_id: self.admin_id,
99 }
100 }
101}
102
103#[derive(Error, Debug)]
105pub enum CommunicationError<E: fmt::Debug> {
106 #[error(
109 "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
110 )]
111 NoConsensus(u64, Vec<(u64, usize)>),
112 #[error("Failed to communicate with a quorum of validators: {0}")]
115 Trusted(E),
116 #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
119 Sample(Vec<(E, u64)>),
120}
121
122pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
129 validator_clients: &'a [RemoteNode<A>],
130 committee: &Committee,
131 group_by: G,
132 execute: F,
133 quorum_grace_period: f64,
135) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
136where
137 A: ValidatorNode + Clone + 'static,
138 F: Clone + Fn(RemoteNode<A>) -> R,
139 R: Future<Output = Result<V, chain_client::Error>> + 'a,
140 G: Fn(&V) -> K,
141 K: Hash + PartialEq + Eq + Clone + 'static,
142 V: 'static,
143{
144 let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
145 .iter()
146 .filter_map(|remote_node| {
147 if committee.weight(&remote_node.public_key) == 0 {
148 return None;
151 }
152 let execute = execute.clone();
153 let remote_node = remote_node.clone();
154 Some(async move { (remote_node.public_key, execute(remote_node).await) })
155 })
156 .collect();
157
158 let start_time = Instant::now();
159 let mut end_time: Option<Instant> = None;
160 let mut remaining_votes = committee.total_votes();
161 let mut highest_key_score = 0;
162 let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
163 let mut error_scores = HashMap::new();
164
165 'vote_wait: while let Ok(Some((name, result))) = timeout(
166 end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
167 responses.next(),
168 )
169 .await
170 {
171 remaining_votes -= committee.weight(&name);
172 match result {
173 Ok(value) => {
174 let key = group_by(&value);
175 let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
176 entry.0 += committee.weight(&name);
177 entry.1.push((name, value));
178 highest_key_score = highest_key_score.max(entry.0);
179 }
180 Err(err) => {
181 let err = match err {
183 chain_client::Error::RemoteNodeError(err) => err,
184 err => NodeError::ResponseHandlingError {
185 error: err.to_string(),
186 },
187 };
188 let entry = error_scores.entry(err.clone()).or_insert(0);
189 *entry += committee.weight(&name);
190 if *entry >= committee.validity_threshold() {
191 return Err(CommunicationError::Trusted(err));
194 }
195 }
196 }
197 if highest_key_score + remaining_votes < committee.quorum_threshold() {
199 break 'vote_wait;
200 }
201
202 if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
205 end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
206 }
207 }
208
209 let scores = value_scores
210 .values()
211 .map(|(weight, values)| (*weight, values.len()))
212 .collect();
213 if let Some((key, (_, values))) = value_scores
215 .into_iter()
216 .find(|(_, (score, _))| *score >= committee.quorum_threshold())
217 {
218 return Ok((key, values));
219 }
220
221 if error_scores.is_empty() {
222 return Err(CommunicationError::NoConsensus(
223 committee.quorum_threshold(),
224 scores,
225 ));
226 }
227
228 let mut sample = error_scores.into_iter().collect::<Vec<_>>();
230 sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
231 sample.truncate(4);
232 Err(CommunicationError::Sample(sample))
233}
234
235impl<Env> ValidatorUpdater<Env>
236where
237 Env: Environment + 'static,
238{
239 #[instrument(
240 level = "trace", skip_all, err(level = Level::WARN),
241 fields(chain_id = %certificate.block().header.chain_id)
242 )]
243 async fn send_confirmed_certificate(
244 &mut self,
245 certificate: GenericCertificate<ConfirmedBlock>,
246 delivery: CrossChainMessageDelivery,
247 ) -> Result<Box<ChainInfo>, chain_client::Error> {
248 let mut result = self
249 .remote_node
250 .handle_optimized_confirmed_certificate(&certificate, delivery)
251 .await;
252
253 let mut sent_admin_chain = false;
254 let mut sent_blobs = false;
255 loop {
256 match result {
257 Err(NodeError::EventsNotFound(event_ids))
258 if !sent_admin_chain
259 && certificate.inner().chain_id() != self.admin_id
260 && event_ids.iter().all(|event_id| {
261 event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
262 && event_id.chain_id == self.admin_id
263 }) =>
264 {
265 self.update_admin_chain().await?;
267 sent_admin_chain = true;
268 }
269 Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
270 self.remote_node
272 .check_blobs_not_found(&certificate, &blob_ids)?;
273 let maybe_blobs = self
275 .client
276 .local_node
277 .read_blobs_from_storage(&blob_ids)
278 .await?;
279 let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
280 self.remote_node.node.upload_blobs(blobs).await?;
281 sent_blobs = true;
282 }
283 result => return Ok(result?),
284 }
285 result = self
286 .remote_node
287 .handle_confirmed_certificate(certificate.clone(), delivery)
288 .await;
289 }
290 }
291
292 async fn send_validated_certificate(
293 &mut self,
294 certificate: GenericCertificate<ValidatedBlock>,
295 delivery: CrossChainMessageDelivery,
296 ) -> Result<Box<ChainInfo>, chain_client::Error> {
297 let result = self
298 .remote_node
299 .handle_optimized_validated_certificate(&certificate, delivery)
300 .await;
301
302 let chain_id = certificate.inner().chain_id();
303 match &result {
304 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
305 self.remote_node
306 .check_blobs_not_found(&certificate, blob_ids)?;
307 let blobs = self
310 .client
311 .local_node
312 .get_locking_blobs(blob_ids, chain_id)
313 .await?
314 .ok_or_else(|| original_err.clone())?;
315 self.remote_node.send_pending_blobs(chain_id, blobs).await?;
316 }
317 Err(error) => {
318 self.sync_if_needed(
319 chain_id,
320 certificate.round,
321 certificate.block().header.height,
322 error,
323 )
324 .await?;
325 }
326 _ => return Ok(result?),
327 }
328 Ok(self
329 .remote_node
330 .handle_validated_certificate(certificate)
331 .await?)
332 }
333
334 async fn request_timeout(
339 &mut self,
340 chain_id: ChainId,
341 round: Round,
342 height: BlockHeight,
343 ) -> Result<Box<ChainInfo>, chain_client::Error> {
344 let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
345 let result = self
346 .remote_node
347 .handle_chain_info_query(query.clone())
348 .await;
349 if let Err(err) = &result {
350 self.sync_if_needed(chain_id, round, height, err).await?;
351 }
352 Ok(result?)
353 }
354
355 async fn sync_if_needed(
357 &mut self,
358 chain_id: ChainId,
359 round: Round,
360 height: BlockHeight,
361 error: &NodeError,
362 ) -> Result<(), chain_client::Error> {
363 let address = &self.remote_node.address();
364 match error {
365 NodeError::WrongRound(validator_round) if *validator_round > round => {
366 tracing::debug!(
367 address, %chain_id, %validator_round, %round,
368 "validator is at a higher round; synchronizing",
369 );
370 self.client
371 .synchronize_chain_state_from(&self.remote_node, chain_id)
372 .await?;
373 }
374 NodeError::UnexpectedBlockHeight {
375 expected_block_height,
376 found_block_height,
377 } if expected_block_height > found_block_height => {
378 tracing::debug!(
379 address,
380 %chain_id,
381 %expected_block_height,
382 %found_block_height,
383 "validator is at a higher height; synchronizing",
384 );
385 self.client
386 .synchronize_chain_state_from(&self.remote_node, chain_id)
387 .await?;
388 }
389 NodeError::WrongRound(validator_round) if *validator_round < round => {
390 tracing::debug!(
391 address, %chain_id, %validator_round, %round,
392 "validator is at a lower round; sending chain info",
393 );
394 self.send_chain_information(
395 chain_id,
396 height,
397 CrossChainMessageDelivery::NonBlocking,
398 None,
399 )
400 .await?;
401 }
402 NodeError::UnexpectedBlockHeight {
403 expected_block_height,
404 found_block_height,
405 } if expected_block_height < found_block_height => {
406 tracing::debug!(
407 address,
408 %chain_id,
409 %expected_block_height,
410 %found_block_height,
411 "Validator is at a lower height; sending chain info.",
412 );
413 self.send_chain_information(
414 chain_id,
415 height,
416 CrossChainMessageDelivery::NonBlocking,
417 None,
418 )
419 .await?;
420 }
421 NodeError::InactiveChain(chain_id) => {
422 tracing::debug!(
423 address,
424 %chain_id,
425 "Validator has inactive chain; sending chain info.",
426 );
427 self.send_chain_information(
428 *chain_id,
429 height,
430 CrossChainMessageDelivery::NonBlocking,
431 None,
432 )
433 .await?;
434 }
435 _ => {}
436 }
437 Ok(())
438 }
439
440 async fn send_block_proposal(
441 &mut self,
442 proposal: Box<BlockProposal>,
443 mut blob_ids: Vec<BlobId>,
444 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
445 ) -> Result<Box<ChainInfo>, chain_client::Error> {
446 let chain_id = proposal.content.block.chain_id;
447 let mut sent_cross_chain_updates = BTreeMap::new();
448 let mut publisher_chain_ids_sent = BTreeSet::new();
449 let storage = self.client.local_node.storage_client();
450 loop {
451 let local_time = storage.clock().current_time();
452 match self
453 .remote_node
454 .handle_block_proposal(proposal.clone())
455 .await
456 {
457 Ok(info) => return Ok(info),
458 Err(NodeError::WrongRound(_round)) => {
459 tracing::debug!(
462 remote_node = self.remote_node.address(),
463 %chain_id,
464 "wrong round; sending chain to validator",
465 );
466 self.send_chain_information(
467 chain_id,
468 proposal.content.block.height,
469 CrossChainMessageDelivery::NonBlocking,
470 None,
471 )
472 .await?;
473 }
474 Err(NodeError::UnexpectedBlockHeight {
475 expected_block_height,
476 found_block_height,
477 }) if expected_block_height < found_block_height
478 && found_block_height == proposal.content.block.height =>
479 {
480 tracing::debug!(
481 remote_node = self.remote_node.address(),
482 %chain_id,
483 "wrong height; sending chain to validator",
484 );
485 self.send_chain_information(
487 chain_id,
488 found_block_height,
489 CrossChainMessageDelivery::NonBlocking,
490 None,
491 )
492 .await?;
493 }
494 Err(NodeError::MissingCrossChainUpdate {
495 chain_id,
496 origin,
497 height,
498 }) if chain_id == proposal.content.block.chain_id
499 && sent_cross_chain_updates
500 .get(&origin)
501 .is_none_or(|h| *h < height) =>
502 {
503 tracing::debug!(
504 remote_node = %self.remote_node.address(),
505 chain_id = %origin,
506 "Missing cross-chain update; sending chain to validator.",
507 );
508 sent_cross_chain_updates.insert(origin, height);
509 self.send_chain_information(
513 origin,
514 height.try_add_one()?,
515 CrossChainMessageDelivery::Blocking,
516 None,
517 )
518 .await?;
519 }
520 Err(NodeError::EventsNotFound(event_ids)) => {
521 let mut publisher_heights = BTreeMap::new();
522 let chain_ids = event_ids
523 .iter()
524 .map(|event_id| event_id.chain_id)
525 .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
526 .collect::<BTreeSet<_>>();
527 tracing::debug!(
528 remote_node = self.remote_node.address(),
529 ?chain_ids,
530 "missing events; sending chains to validator",
531 );
532 ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
533 for chain_id in chain_ids {
534 let height = self
535 .client
536 .local_node
537 .get_next_height_to_preprocess(chain_id)
538 .await?;
539 publisher_heights.insert(chain_id, height);
540 publisher_chain_ids_sent.insert(chain_id);
541 }
542 self.send_chain_info_up_to_heights(
543 publisher_heights,
544 CrossChainMessageDelivery::NonBlocking,
545 )
546 .await?;
547 }
548 Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
549 if !blob_ids.is_empty() =>
550 {
551 tracing::debug!("Missing blobs");
552 let published_blob_ids =
556 BTreeSet::from_iter(proposal.content.block.published_blob_ids());
557 blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
558 let published_blobs = self
559 .client
560 .local_node
561 .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
562 .await?;
563 self.remote_node
564 .send_pending_blobs(chain_id, published_blobs)
565 .await?;
566 let missing_blob_ids = self
567 .remote_node
568 .node
569 .missing_blob_ids(mem::take(&mut blob_ids))
570 .await?;
571
572 tracing::debug!("Sending chains for missing blobs");
573 self.send_chain_info_for_blobs(
574 &missing_blob_ids,
575 CrossChainMessageDelivery::NonBlocking,
576 )
577 .await?;
578 }
579 Err(NodeError::InvalidTimestamp {
580 block_timestamp,
581 local_time: validator_local_time,
582 ..
583 }) => {
584 let clock_skew = local_time.delta_since(validator_local_time);
591 tracing::debug!(
592 remote_node = self.remote_node.address(),
593 %chain_id,
594 %block_timestamp,
595 ?clock_skew,
596 "validator's clock is behind; waiting and retrying",
597 );
598 let _ = clock_skew_sender.send((self.remote_node.public_key, clock_skew));
600 storage
601 .clock()
602 .sleep_until(block_timestamp.saturating_add(clock_skew))
603 .await;
604 }
605 Err(err) => return Err(err.into()),
607 }
608 }
609 }
610
611 async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
612 let local_admin_info = self.client.local_node.chain_info(self.admin_id).await?;
613 Box::pin(self.send_chain_information(
614 self.admin_id,
615 local_admin_info.next_block_height,
616 CrossChainMessageDelivery::NonBlocking,
617 None,
618 ))
619 .await
620 }
621
622 pub async fn send_chain_information(
652 &mut self,
653 chain_id: ChainId,
654 target_block_height: BlockHeight,
655 delivery: CrossChainMessageDelivery,
656 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
657 ) -> Result<(), chain_client::Error> {
658 let info = if target_block_height.0 > 0 {
660 self.sync_chain_height(chain_id, target_block_height, delivery, latest_certificate)
661 .await?
662 } else {
663 self.initialize_new_chain_on_validator(chain_id).await?
664 };
665
666 let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
670 let query = ChainInfoQuery::new(chain_id).with_manager_values();
671 let local_info = match self.client.local_node.handle_chain_info_query(query).await {
672 Ok(response) => response.info,
673 Err(LocalNodeError::BlobsNotFound(_)) => {
677 tracing::debug!("local chain description not fully available, skipping round sync");
678 return Ok(());
679 }
680 Err(error) => return Err(error.into()),
681 };
682
683 let manager = local_info.manager;
684 if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
685 return Ok(());
686 }
687
688 self.sync_consensus_round(remote_round, &manager).await
690 }
691
692 async fn sync_chain_height(
698 &mut self,
699 chain_id: ChainId,
700 target_block_height: BlockHeight,
701 delivery: CrossChainMessageDelivery,
702 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
703 ) -> Result<Box<ChainInfo>, chain_client::Error> {
704 let height = target_block_height.try_sub_one()?;
705
706 let certificate = if let Some(cert) = latest_certificate {
708 cert
709 } else {
710 let hash = self
711 .client
712 .local_node
713 .get_block_hashes(chain_id, vec![height])
714 .await?
715 .into_iter()
716 .next()
717 .ok_or_else(|| {
718 chain_client::Error::InternalError(
719 "send_chain_information called with invalid target_block_height",
720 )
721 })?;
722 self.client
723 .local_node
724 .storage_client()
725 .read_certificate(hash)
726 .await?
727 .ok_or_else(|| chain_client::Error::MissingConfirmedBlock(hash))?
728 };
729
730 let info = match self.send_confirmed_certificate(certificate, delivery).await {
732 Ok(info) => info,
733 Err(error) => {
734 tracing::debug!(
735 address = self.remote_node.address(), %error,
736 "validator failed to handle confirmed certificate; sending whole chain",
737 );
738 let query = ChainInfoQuery::new(chain_id);
739 self.remote_node.handle_chain_info_query(query).await?
740 }
741 };
742
743 let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
745 .map(BlockHeight)
746 .collect();
747
748 if heights.is_empty() {
749 return Ok(info);
750 }
751
752 let validator_missing_hashes = self
754 .client
755 .local_node
756 .get_block_hashes(chain_id, heights)
757 .await?;
758
759 let certificates = self
760 .client
761 .local_node
762 .storage_client()
763 .read_certificates(validator_missing_hashes.clone())
764 .await?;
765 let certificates = match ResultReadCertificates::new(certificates, validator_missing_hashes)
766 {
767 ResultReadCertificates::Certificates(certificates) => certificates,
768 ResultReadCertificates::InvalidHashes(hashes) => {
769 return Err(chain_client::Error::ReadCertificatesError(hashes))
770 }
771 };
772 for certificate in certificates {
773 self.send_confirmed_certificate(certificate, delivery)
774 .await?;
775 }
776
777 Ok(info)
778 }
779
780 async fn initialize_new_chain_on_validator(
786 &mut self,
787 chain_id: ChainId,
788 ) -> Result<Box<ChainInfo>, chain_client::Error> {
789 self.send_chain_info_for_blobs(
791 &[BlobId::new(chain_id.0, BlobType::ChainDescription)],
792 CrossChainMessageDelivery::NonBlocking,
793 )
794 .await?;
795
796 let query = ChainInfoQuery::new(chain_id);
798 let info = self.remote_node.handle_chain_info_query(query).await?;
799 Ok(info)
800 }
801
802 async fn sync_consensus_round(
809 &mut self,
810 remote_round: Round,
811 manager: &linera_chain::manager::ChainManagerInfo,
812 ) -> Result<(), chain_client::Error> {
813 for proposal in manager
815 .requested_proposed
816 .iter()
817 .chain(manager.requested_signed_proposal.iter())
818 {
819 if proposal.content.round == manager.current_round {
820 match self
821 .remote_node
822 .handle_block_proposal(proposal.clone())
823 .await
824 {
825 Ok(_) => {
826 tracing::debug!("successfully sent block proposal for round sync");
827 return Ok(());
828 }
829 Err(error) => {
830 tracing::debug!(%error, "failed to send block proposal");
831 }
832 }
833 }
834 }
835
836 if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.as_deref() {
838 if validated.round == manager.current_round {
839 match self
840 .remote_node
841 .handle_optimized_validated_certificate(
842 validated,
843 CrossChainMessageDelivery::NonBlocking,
844 )
845 .await
846 {
847 Ok(_) => {
848 tracing::debug!("successfully sent validated block for round sync");
849 return Ok(());
850 }
851 Err(error) => {
852 tracing::debug!(%error, "failed to send validated block");
853 }
854 }
855 }
856 }
857
858 if let Some(cert) = &manager.timeout {
860 if cert.round >= remote_round {
861 match self
862 .remote_node
863 .handle_timeout_certificate(cert.as_ref().clone())
864 .await
865 {
866 Ok(_) => {
867 tracing::debug!(round = %cert.round, "successfully sent timeout certificate");
868 return Ok(());
869 }
870 Err(error) => {
871 tracing::debug!(%error, round = %cert.round, "failed to send timeout certificate");
872 }
873 }
874 }
875 }
876
877 tracing::debug!("round sync not performed: no applicable data or all attempts failed");
880 Ok(())
881 }
882
883 async fn send_chain_info_for_blobs(
888 &mut self,
889 blob_ids: &[BlobId],
890 delivery: CrossChainMessageDelivery,
891 ) -> Result<(), chain_client::Error> {
892 let blob_states = self
893 .client
894 .local_node
895 .read_blob_states_from_storage(blob_ids)
896 .await?;
897
898 let mut chain_heights = BTreeMap::new();
899 for blob_state in blob_states {
900 let block_chain_id = blob_state.chain_id;
901 let block_height = blob_state.block_height.try_add_one()?;
902 chain_heights
903 .entry(block_chain_id)
904 .and_modify(|h| *h = block_height.max(*h))
905 .or_insert(block_height);
906 }
907
908 self.send_chain_info_up_to_heights(chain_heights, delivery)
909 .await
910 }
911
912 async fn send_chain_info_up_to_heights(
913 &mut self,
914 chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
915 delivery: CrossChainMessageDelivery,
916 ) -> Result<(), chain_client::Error> {
917 FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
918 let mut updater = self.clone();
919 async move {
920 updater
921 .send_chain_information(chain_id, height, delivery, None)
922 .await
923 }
924 }))
925 .try_collect::<Vec<_>>()
926 .await?;
927 Ok(())
928 }
929
930 pub async fn send_chain_update(
931 &mut self,
932 action: CommunicateAction,
933 ) -> Result<LiteVote, chain_client::Error> {
934 let chain_id = match &action {
935 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
936 CommunicateAction::FinalizeBlock { certificate, .. } => {
937 certificate.inner().block().header.chain_id
938 }
939 CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
940 };
941 let vote = match action {
943 CommunicateAction::SubmitBlock {
944 proposal,
945 blob_ids,
946 clock_skew_sender,
947 } => {
948 let info = self
949 .send_block_proposal(proposal, blob_ids, clock_skew_sender)
950 .await?;
951 info.manager.pending.ok_or_else(|| {
952 NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
953 })?
954 }
955 CommunicateAction::FinalizeBlock {
956 certificate,
957 delivery,
958 } => {
959 let info = self
960 .send_validated_certificate(*certificate, delivery)
961 .await?;
962 info.manager.pending.ok_or_else(|| {
963 NodeError::MissingVoteInValidatorResponse("finalize a block".into())
964 })?
965 }
966 CommunicateAction::RequestTimeout { round, height, .. } => {
967 let info = self.request_timeout(chain_id, round, height).await?;
968 info.manager.timeout_vote.ok_or_else(|| {
969 NodeError::MissingVoteInValidatorResponse("request a timeout".into())
970 })?
971 }
972 };
973 vote.check(self.remote_node.public_key)?;
974 Ok(vote)
975 }
976}