1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt,
8 hash::Hash,
9 mem,
10 sync::Arc,
11};
12
13use futures::{
14 stream::{FuturesUnordered, TryStreamExt},
15 Future, StreamExt,
16};
17use linera_base::{
18 crypto::ValidatorPublicKey,
19 data_types::{BlockHeight, Round, TimeDelta},
20 ensure,
21 identifiers::{BlobId, BlobType, ChainId, StreamId},
22 time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25 data_types::{BlockProposal, LiteVote},
26 manager::LockingBlock,
27 types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{Clock, Storage};
31use thiserror::Error;
32use tokio::sync::mpsc;
33use tracing::{instrument, Level};
34
35use crate::{
36 client::{chain_client, Client},
37 data_types::{ChainInfo, ChainInfoQuery},
38 environment::Environment,
39 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
40 remote_node::RemoteNode,
41 LocalNodeError,
42};
43
44pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
47
48pub type ClockSkewReport = (ValidatorPublicKey, TimeDelta);
50const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
55pub enum CommunicateAction {
56 SubmitBlock {
57 proposal: Box<BlockProposal>,
58 blob_ids: Vec<BlobId>,
59 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
61 },
62 FinalizeBlock {
63 certificate: Box<ValidatedBlockCertificate>,
64 delivery: CrossChainMessageDelivery,
65 },
66 RequestTimeout {
67 chain_id: ChainId,
68 height: BlockHeight,
69 round: Round,
70 },
71}
72
73impl CommunicateAction {
74 pub fn round(&self) -> Round {
76 match self {
77 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
78 CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
79 CommunicateAction::RequestTimeout { round, .. } => *round,
80 }
81 }
82}
83
84pub struct ValidatorUpdater<Env>
85where
86 Env: Environment,
87{
88 pub remote_node: RemoteNode<Env::ValidatorNode>,
89 pub client: Arc<Client<Env>>,
90 pub admin_chain_id: ChainId,
91}
92
93impl<Env: Environment> Clone for ValidatorUpdater<Env> {
94 fn clone(&self) -> Self {
95 ValidatorUpdater {
96 remote_node: self.remote_node.clone(),
97 client: self.client.clone(),
98 admin_chain_id: self.admin_chain_id,
99 }
100 }
101}
102
103#[derive(Error, Debug)]
105pub enum CommunicationError<E: fmt::Debug> {
106 #[error(
109 "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
110 )]
111 NoConsensus(u64, Vec<(u64, usize)>),
112 #[error("Failed to communicate with a quorum of validators: {0}")]
115 Trusted(E),
116 #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
119 Sample(Vec<(E, u64)>),
120}
121
122pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
129 validator_clients: &'a [RemoteNode<A>],
130 committee: &Committee,
131 group_by: G,
132 execute: F,
133 quorum_grace_period: f64,
135) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
136where
137 A: ValidatorNode + Clone + 'static,
138 F: Clone + Fn(RemoteNode<A>) -> R,
139 R: Future<Output = Result<V, chain_client::Error>> + 'a,
140 G: Fn(&V) -> K,
141 K: Hash + PartialEq + Eq + Clone + 'static,
142 V: 'static,
143{
144 let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
145 .iter()
146 .filter_map(|remote_node| {
147 if committee.weight(&remote_node.public_key) == 0 {
148 return None;
151 }
152 let execute = execute.clone();
153 let remote_node = remote_node.clone();
154 Some(async move { (remote_node.public_key, execute(remote_node).await) })
155 })
156 .collect();
157
158 let start_time = Instant::now();
159 let mut end_time: Option<Instant> = None;
160 let mut remaining_votes = committee.total_votes();
161 let mut highest_key_score = 0;
162 let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
163 let mut error_scores = HashMap::new();
164
165 'vote_wait: while let Ok(Some((name, result))) = timeout(
166 end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
167 responses.next(),
168 )
169 .await
170 {
171 remaining_votes -= committee.weight(&name);
172 match result {
173 Ok(value) => {
174 let key = group_by(&value);
175 let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
176 entry.0 += committee.weight(&name);
177 entry.1.push((name, value));
178 highest_key_score = highest_key_score.max(entry.0);
179 }
180 Err(err) => {
181 let err = match err {
183 chain_client::Error::RemoteNodeError(err) => err,
184 err => NodeError::ResponseHandlingError {
185 error: err.to_string(),
186 },
187 };
188 let entry = error_scores.entry(err.clone()).or_insert(0);
189 *entry += committee.weight(&name);
190 }
191 }
192 if highest_key_score + remaining_votes < committee.quorum_threshold() {
194 break 'vote_wait;
195 }
196
197 if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
200 end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
201 }
202 }
203
204 let scores = value_scores
205 .values()
206 .map(|(weight, values)| (*weight, values.len()))
207 .collect();
208 if let Some((key, (_, values))) = value_scores
210 .into_iter()
211 .find(|(_, (score, _))| *score >= committee.quorum_threshold())
212 {
213 return Ok((key, values));
214 }
215
216 let mut sample = error_scores.into_iter().collect::<Vec<_>>();
217 sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
218 sample.truncate(4);
219 Err(match sample.as_slice() {
220 [] => CommunicationError::NoConsensus(committee.quorum_threshold(), scores),
221 [(_, score), ..] if *score >= committee.validity_threshold() => {
222 CommunicationError::Trusted(sample.into_iter().next().unwrap().0)
224 }
225 _ => CommunicationError::Sample(sample),
227 })
228}
229
230impl<Env> ValidatorUpdater<Env>
231where
232 Env: Environment + 'static,
233{
234 fn warn_if_unexpected(&self, err: &NodeError) {
236 if !err.is_expected() {
237 tracing::warn!(
238 remote_node = self.remote_node.address(),
239 %err,
240 "unexpected error from validator",
241 );
242 }
243 }
244
245 #[instrument(
246 level = "trace", skip_all, err(level = Level::DEBUG),
247 fields(chain_id = %certificate.block().header.chain_id)
248 )]
249 async fn send_confirmed_certificate(
250 &mut self,
251 certificate: GenericCertificate<ConfirmedBlock>,
252 delivery: CrossChainMessageDelivery,
253 ) -> Result<Box<ChainInfo>, chain_client::Error> {
254 let mut result = self
255 .remote_node
256 .handle_optimized_confirmed_certificate(&certificate, delivery)
257 .await;
258
259 let mut sent_admin_chain = false;
260 let mut sent_blobs = false;
261 loop {
262 match result {
263 Err(NodeError::EventsNotFound(event_ids))
264 if !sent_admin_chain
265 && certificate.inner().chain_id() != self.admin_chain_id
266 && event_ids.iter().all(|event_id| {
267 event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
268 && event_id.chain_id == self.admin_chain_id
269 }) =>
270 {
271 self.update_admin_chain().await?;
273 sent_admin_chain = true;
274 }
275 Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
276 self.remote_node
278 .check_blobs_not_found(&certificate, &blob_ids)?;
279 let maybe_blobs = self
281 .client
282 .local_node
283 .read_blobs_from_storage(&blob_ids)
284 .await?;
285 let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
286 self.remote_node.node.upload_blobs(blobs).await?;
287 sent_blobs = true;
288 }
289 result => {
290 if let Err(err) = &result {
291 self.warn_if_unexpected(err);
292 }
293 return Ok(result?);
294 }
295 }
296 result = self
297 .remote_node
298 .handle_confirmed_certificate(certificate.clone(), delivery)
299 .await;
300 }
301 }
302
303 async fn send_validated_certificate(
304 &mut self,
305 certificate: GenericCertificate<ValidatedBlock>,
306 delivery: CrossChainMessageDelivery,
307 ) -> Result<Box<ChainInfo>, chain_client::Error> {
308 let result = self
309 .remote_node
310 .handle_optimized_validated_certificate(&certificate, delivery)
311 .await;
312
313 let chain_id = certificate.inner().chain_id();
314 match &result {
315 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
316 self.remote_node
317 .check_blobs_not_found(&certificate, blob_ids)?;
318 let blobs = self
321 .client
322 .local_node
323 .get_locking_blobs(blob_ids, chain_id)
324 .await?
325 .ok_or_else(|| original_err.clone())?;
326 self.remote_node.send_pending_blobs(chain_id, blobs).await?;
327 }
328 Err(error) => {
329 self.sync_if_needed(
330 chain_id,
331 certificate.round,
332 certificate.block().header.height,
333 error,
334 )
335 .await?;
336 }
337 _ => return Ok(result?),
338 }
339 let result = self
340 .remote_node
341 .handle_validated_certificate(certificate)
342 .await;
343 if let Err(err) = &result {
344 self.warn_if_unexpected(err);
345 }
346 Ok(result?)
347 }
348
349 async fn request_timeout(
354 &mut self,
355 chain_id: ChainId,
356 round: Round,
357 height: BlockHeight,
358 ) -> Result<Box<ChainInfo>, chain_client::Error> {
359 let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
360 let result = self
361 .remote_node
362 .handle_chain_info_query(query.clone())
363 .await;
364 if let Err(err) = &result {
365 self.sync_if_needed(chain_id, round, height, err).await?;
366 self.warn_if_unexpected(err);
367 }
368 Ok(result?)
369 }
370
371 async fn sync_if_needed(
373 &mut self,
374 chain_id: ChainId,
375 round: Round,
376 height: BlockHeight,
377 error: &NodeError,
378 ) -> Result<(), chain_client::Error> {
379 let address = &self.remote_node.address();
380 match error {
381 NodeError::WrongRound(validator_round) if *validator_round > round => {
382 tracing::debug!(
383 address, %chain_id, %validator_round, %round,
384 "validator is at a higher round; synchronizing",
385 );
386 self.client
387 .synchronize_chain_state_from(&self.remote_node, chain_id)
388 .await?;
389 }
390 NodeError::UnexpectedBlockHeight {
391 expected_block_height,
392 found_block_height,
393 } if expected_block_height > found_block_height => {
394 tracing::debug!(
395 address,
396 %chain_id,
397 %expected_block_height,
398 %found_block_height,
399 "validator is at a higher height; synchronizing",
400 );
401 self.client
402 .synchronize_chain_state_from(&self.remote_node, chain_id)
403 .await?;
404 }
405 NodeError::WrongRound(validator_round) if *validator_round < round => {
406 tracing::debug!(
407 address, %chain_id, %validator_round, %round,
408 "validator is at a lower round; sending chain info",
409 );
410 self.send_chain_information(
411 chain_id,
412 height,
413 CrossChainMessageDelivery::NonBlocking,
414 None,
415 )
416 .await?;
417 }
418 NodeError::UnexpectedBlockHeight {
419 expected_block_height,
420 found_block_height,
421 } if expected_block_height < found_block_height => {
422 tracing::debug!(
423 address,
424 %chain_id,
425 %expected_block_height,
426 %found_block_height,
427 "Validator is at a lower height; sending chain info.",
428 );
429 self.send_chain_information(
430 chain_id,
431 height,
432 CrossChainMessageDelivery::NonBlocking,
433 None,
434 )
435 .await?;
436 }
437 NodeError::InactiveChain(chain_id) => {
438 tracing::debug!(
439 address,
440 %chain_id,
441 "Validator has inactive chain; sending chain info.",
442 );
443 self.send_chain_information(
444 *chain_id,
445 height,
446 CrossChainMessageDelivery::NonBlocking,
447 None,
448 )
449 .await?;
450 }
451 _ => {}
452 }
453 Ok(())
454 }
455
456 async fn send_block_proposal(
457 &mut self,
458 proposal: Box<BlockProposal>,
459 mut blob_ids: Vec<BlobId>,
460 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
461 ) -> Result<Box<ChainInfo>, chain_client::Error> {
462 let chain_id = proposal.content.block.chain_id;
463 let mut sent_cross_chain_updates = BTreeMap::new();
464 let mut publisher_chain_ids_sent = BTreeSet::new();
465 let storage = self.client.local_node.storage_client();
466 loop {
467 let local_time = storage.clock().current_time();
468 match self
469 .remote_node
470 .handle_block_proposal(proposal.clone())
471 .await
472 {
473 Ok(info) => return Ok(info),
474 Err(NodeError::WrongRound(_round)) => {
475 tracing::debug!(
478 remote_node = self.remote_node.address(),
479 %chain_id,
480 "wrong round; sending chain to validator",
481 );
482 self.send_chain_information(
483 chain_id,
484 proposal.content.block.height,
485 CrossChainMessageDelivery::NonBlocking,
486 None,
487 )
488 .await?;
489 }
490 Err(NodeError::UnexpectedBlockHeight {
491 expected_block_height,
492 found_block_height,
493 }) if expected_block_height < found_block_height
494 && found_block_height == proposal.content.block.height =>
495 {
496 tracing::debug!(
497 remote_node = self.remote_node.address(),
498 %chain_id,
499 "wrong height; sending chain to validator",
500 );
501 self.send_chain_information(
503 chain_id,
504 found_block_height,
505 CrossChainMessageDelivery::NonBlocking,
506 None,
507 )
508 .await?;
509 }
510 Err(NodeError::MissingCrossChainUpdate {
511 chain_id,
512 origin,
513 height,
514 }) if chain_id == proposal.content.block.chain_id
515 && sent_cross_chain_updates
516 .get(&origin)
517 .is_none_or(|h| *h < height) =>
518 {
519 tracing::debug!(
520 remote_node = %self.remote_node.address(),
521 chain_id = %origin,
522 "Missing cross-chain update; sending chain to validator.",
523 );
524 sent_cross_chain_updates.insert(origin, height);
525 self.send_chain_information(
529 origin,
530 height.try_add_one()?,
531 CrossChainMessageDelivery::Blocking,
532 None,
533 )
534 .await?;
535 }
536 Err(NodeError::EventsNotFound(event_ids)) => {
537 let mut publisher_heights = BTreeMap::new();
538 let chain_ids = event_ids
539 .iter()
540 .map(|event_id| event_id.chain_id)
541 .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
542 .collect::<BTreeSet<_>>();
543 tracing::debug!(
544 remote_node = self.remote_node.address(),
545 ?chain_ids,
546 "missing events; sending chains to validator",
547 );
548 ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
549 for chain_id in chain_ids {
550 let height = self
551 .client
552 .local_node
553 .get_next_height_to_preprocess(chain_id)
554 .await?;
555 publisher_heights.insert(chain_id, height);
556 publisher_chain_ids_sent.insert(chain_id);
557 }
558 self.send_chain_info_up_to_heights(
559 publisher_heights,
560 CrossChainMessageDelivery::NonBlocking,
561 )
562 .await?;
563 }
564 Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
565 if !blob_ids.is_empty() =>
566 {
567 tracing::debug!("Missing blobs");
568 let published_blob_ids =
572 BTreeSet::from_iter(proposal.content.block.published_blob_ids());
573 blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
574 let published_blobs = self
575 .client
576 .local_node
577 .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
578 .await?;
579 self.remote_node
580 .send_pending_blobs(chain_id, published_blobs)
581 .await?;
582 let missing_blob_ids = self
583 .remote_node
584 .node
585 .missing_blob_ids(mem::take(&mut blob_ids))
586 .await?;
587
588 tracing::debug!("Sending chains for missing blobs");
589 self.send_chain_info_for_blobs(
590 &missing_blob_ids,
591 CrossChainMessageDelivery::NonBlocking,
592 )
593 .await?;
594 }
595 Err(NodeError::InvalidTimestamp {
596 block_timestamp,
597 local_time: validator_local_time,
598 ..
599 }) => {
600 let clock_skew = local_time.delta_since(validator_local_time);
607 tracing::debug!(
608 remote_node = self.remote_node.address(),
609 %chain_id,
610 %block_timestamp,
611 ?clock_skew,
612 "validator's clock is behind; waiting and retrying",
613 );
614 clock_skew_sender
617 .send((self.remote_node.public_key, clock_skew))
618 .ok();
619 storage
620 .clock()
621 .sleep_until(block_timestamp.saturating_add(clock_skew))
622 .await;
623 }
624 Err(err) => {
626 self.warn_if_unexpected(&err);
627 return Err(err.into());
628 }
629 }
630 }
631 }
632
633 async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
634 let local_admin_info = self
635 .client
636 .local_node
637 .chain_info(self.admin_chain_id)
638 .await?;
639 Box::pin(self.send_chain_information(
640 self.admin_chain_id,
641 local_admin_info.next_block_height,
642 CrossChainMessageDelivery::NonBlocking,
643 None,
644 ))
645 .await
646 }
647
648 #[instrument(level = "trace", skip_all)]
678 pub async fn send_chain_information(
679 &mut self,
680 chain_id: ChainId,
681 target_block_height: BlockHeight,
682 delivery: CrossChainMessageDelivery,
683 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
684 ) -> Result<(), chain_client::Error> {
685 let info = if target_block_height.0 > 0 {
687 self.sync_chain_height(chain_id, target_block_height, delivery, latest_certificate)
688 .await?
689 } else {
690 self.initialize_new_chain_on_validator(chain_id).await?
691 };
692
693 let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
697 let query = ChainInfoQuery::new(chain_id).with_manager_values();
698 let local_info = match self.client.local_node.handle_chain_info_query(query).await {
699 Ok(response) => response.info,
700 Err(LocalNodeError::BlobsNotFound(_)) => {
704 tracing::debug!("local chain description not fully available, skipping round sync");
705 return Ok(());
706 }
707 Err(error) => return Err(error.into()),
708 };
709
710 let manager = local_info.manager;
711 if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
712 return Ok(());
713 }
714
715 self.sync_consensus_round(remote_round, &manager).await
717 }
718
719 async fn sync_chain_height(
725 &mut self,
726 chain_id: ChainId,
727 target_block_height: BlockHeight,
728 delivery: CrossChainMessageDelivery,
729 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
730 ) -> Result<Box<ChainInfo>, chain_client::Error> {
731 let height = target_block_height.try_sub_one()?;
732
733 let certificate = if let Some(cert) = latest_certificate {
735 cert
736 } else {
737 self.read_certificates_for_heights(chain_id, vec![height])
738 .await?
739 .into_iter()
740 .next()
741 .ok_or_else(|| {
742 chain_client::Error::InternalError(
743 "failed to read latest certificate for height sync",
744 )
745 })?
746 };
747
748 let info = match self.send_confirmed_certificate(certificate, delivery).await {
750 Ok(info) => info,
751 Err(error) => {
752 tracing::debug!(
753 address = self.remote_node.address(), %error,
754 "validator failed to handle confirmed certificate; sending whole chain",
755 );
756 let query = ChainInfoQuery::new(chain_id);
757 self.remote_node.handle_chain_info_query(query).await?
758 }
759 };
760
761 let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
763 .map(BlockHeight)
764 .collect();
765
766 if heights.is_empty() {
767 return Ok(info);
768 }
769
770 let certificates = self
772 .read_certificates_for_heights(chain_id, heights)
773 .await?;
774
775 for certificate in certificates {
776 self.send_confirmed_certificate(certificate, delivery)
777 .await?;
778 }
779
780 Ok(info)
781 }
782
783 async fn read_certificates_for_heights(
785 &self,
786 chain_id: ChainId,
787 heights: Vec<BlockHeight>,
788 ) -> Result<Vec<GenericCertificate<ConfirmedBlock>>, chain_client::Error> {
789 let storage = self.client.local_node.storage_client();
790
791 let certificates_by_height = storage
792 .read_certificates_by_heights(chain_id, &heights)
793 .await?;
794
795 Ok(certificates_by_height.into_iter().flatten().collect())
796 }
797
798 async fn initialize_new_chain_on_validator(
804 &self,
805 chain_id: ChainId,
806 ) -> Result<Box<ChainInfo>, chain_client::Error> {
807 self.send_chain_info_for_blobs(
809 &[BlobId::new(chain_id.0, BlobType::ChainDescription)],
810 CrossChainMessageDelivery::NonBlocking,
811 )
812 .await?;
813
814 let query = ChainInfoQuery::new(chain_id);
816 let info = self.remote_node.handle_chain_info_query(query).await?;
817 Ok(info)
818 }
819
820 async fn sync_consensus_round(
827 &self,
828 remote_round: Round,
829 manager: &linera_chain::manager::ChainManagerInfo,
830 ) -> Result<(), chain_client::Error> {
831 for proposal in manager
833 .requested_proposed
834 .iter()
835 .chain(manager.requested_signed_proposal.iter())
836 {
837 if proposal.content.round == manager.current_round {
838 match self
839 .remote_node
840 .handle_block_proposal(proposal.clone())
841 .await
842 {
843 Ok(_) => {
844 tracing::debug!("successfully sent block proposal for round sync");
845 return Ok(());
846 }
847 Err(error) => {
848 tracing::debug!(%error, "failed to send block proposal");
849 }
850 }
851 }
852 }
853
854 if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.as_deref() {
856 if validated.round == manager.current_round {
857 match self
858 .remote_node
859 .handle_optimized_validated_certificate(
860 validated,
861 CrossChainMessageDelivery::NonBlocking,
862 )
863 .await
864 {
865 Ok(_) => {
866 tracing::debug!("successfully sent validated block for round sync");
867 return Ok(());
868 }
869 Err(error) => {
870 tracing::debug!(%error, "failed to send validated block");
871 }
872 }
873 }
874 }
875
876 if let Some(cert) = &manager.timeout {
878 if cert.round >= remote_round {
879 match self
880 .remote_node
881 .handle_timeout_certificate(cert.as_ref().clone())
882 .await
883 {
884 Ok(_) => {
885 tracing::debug!(round = %cert.round, "successfully sent timeout certificate");
886 return Ok(());
887 }
888 Err(error) => {
889 tracing::debug!(%error, round = %cert.round, "failed to send timeout certificate");
890 }
891 }
892 }
893 }
894
895 tracing::debug!("round sync not performed: no applicable data or all attempts failed");
898 Ok(())
899 }
900
901 async fn send_chain_info_for_blobs(
907 &self,
908 blob_ids: &[BlobId],
909 delivery: CrossChainMessageDelivery,
910 ) -> Result<(), chain_client::Error> {
911 let blob_states = self
912 .client
913 .local_node
914 .read_blob_states_from_storage(blob_ids)
915 .await?;
916
917 let mut chain_heights: BTreeMap<ChainId, BTreeSet<BlockHeight>> = BTreeMap::new();
918 for blob_state in blob_states {
919 let block_chain_id = blob_state.chain_id;
920 let block_height = blob_state.block_height;
921 chain_heights
922 .entry(block_chain_id)
923 .or_default()
924 .insert(block_height);
925 }
926
927 self.send_chain_info_at_heights(chain_heights, delivery)
928 .await
929 }
930
931 async fn send_chain_info_at_heights(
937 &self,
938 chain_heights: impl IntoIterator<Item = (ChainId, BTreeSet<BlockHeight>)>,
939 delivery: CrossChainMessageDelivery,
940 ) -> Result<(), chain_client::Error> {
941 chain_heights
942 .into_iter()
943 .map(|(chain_id, heights)| {
944 let mut updater = self.clone();
945 async move {
946 let heights_vec: Vec<_> = heights.into_iter().collect();
948 let certificates = updater
949 .client
950 .local_node
951 .storage_client()
952 .read_certificates_by_heights(chain_id, &heights_vec)
953 .await?
954 .into_iter()
955 .flatten()
956 .collect::<Vec<_>>();
957
958 for certificate in certificates {
960 updater
961 .send_confirmed_certificate(certificate, delivery)
962 .await?;
963 }
964
965 Ok::<_, chain_client::Error>(())
966 }
967 })
968 .collect::<FuturesUnordered<_>>()
969 .try_collect::<Vec<_>>()
970 .await?;
971 Ok(())
972 }
973
974 async fn send_chain_info_up_to_heights(
975 &self,
976 chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
977 delivery: CrossChainMessageDelivery,
978 ) -> Result<(), chain_client::Error> {
979 chain_heights
980 .into_iter()
981 .map(|(chain_id, height)| {
982 let mut updater = self.clone();
983 async move {
984 updater
985 .send_chain_information(chain_id, height, delivery, None)
986 .await
987 }
988 })
989 .collect::<FuturesUnordered<_>>()
990 .try_collect::<Vec<_>>()
991 .await?;
992 Ok(())
993 }
994
995 pub async fn send_chain_update(
996 &mut self,
997 action: CommunicateAction,
998 ) -> Result<LiteVote, chain_client::Error> {
999 let chain_id = match &action {
1000 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
1001 CommunicateAction::FinalizeBlock { certificate, .. } => {
1002 certificate.inner().block().header.chain_id
1003 }
1004 CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
1005 };
1006 let vote = match action {
1008 CommunicateAction::SubmitBlock {
1009 proposal,
1010 blob_ids,
1011 clock_skew_sender,
1012 } => {
1013 let info = self
1014 .send_block_proposal(proposal, blob_ids, clock_skew_sender)
1015 .await?;
1016 info.manager.pending.ok_or_else(|| {
1017 NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
1018 })?
1019 }
1020 CommunicateAction::FinalizeBlock {
1021 certificate,
1022 delivery,
1023 } => {
1024 let info = self
1025 .send_validated_certificate(*certificate, delivery)
1026 .await?;
1027 info.manager.pending.ok_or_else(|| {
1028 NodeError::MissingVoteInValidatorResponse("finalize a block".into())
1029 })?
1030 }
1031 CommunicateAction::RequestTimeout { round, height, .. } => {
1032 let info = self.request_timeout(chain_id, round, height).await?;
1033 info.manager.timeout_vote.ok_or_else(|| {
1034 NodeError::MissingVoteInValidatorResponse("request a timeout".into())
1035 })?
1036 }
1037 };
1038 vote.check(self.remote_node.public_key)?;
1039 Ok(vote)
1040 }
1041}