1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt,
8 hash::Hash,
9 mem,
10 sync::Arc,
11};
12
13use futures::{
14 stream::{FuturesUnordered, TryStreamExt},
15 Future, StreamExt,
16};
17use linera_base::{
18 crypto::ValidatorPublicKey,
19 data_types::{BlockHeight, Round, TimeDelta},
20 ensure,
21 identifiers::{BlobId, BlobType, ChainId, StreamId},
22 time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25 data_types::{BlockProposal, LiteVote},
26 manager::LockingBlock,
27 types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{Clock, ResultReadCertificates, Storage};
31use thiserror::Error;
32use tokio::sync::mpsc;
33use tracing::{instrument, Level};
34
35use crate::{
36 client::{chain_client, Client},
37 data_types::{ChainInfo, ChainInfoQuery},
38 environment::Environment,
39 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
40 remote_node::RemoteNode,
41 LocalNodeError,
42};
43
44pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
47
48pub type ClockSkewReport = (ValidatorPublicKey, TimeDelta);
50const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
55pub enum CommunicateAction {
56 SubmitBlock {
57 proposal: Box<BlockProposal>,
58 blob_ids: Vec<BlobId>,
59 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
61 },
62 FinalizeBlock {
63 certificate: Box<ValidatedBlockCertificate>,
64 delivery: CrossChainMessageDelivery,
65 },
66 RequestTimeout {
67 chain_id: ChainId,
68 height: BlockHeight,
69 round: Round,
70 },
71}
72
73impl CommunicateAction {
74 pub fn round(&self) -> Round {
76 match self {
77 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
78 CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
79 CommunicateAction::RequestTimeout { round, .. } => *round,
80 }
81 }
82}
83
84pub struct ValidatorUpdater<Env>
85where
86 Env: Environment,
87{
88 pub remote_node: RemoteNode<Env::ValidatorNode>,
89 pub client: Arc<Client<Env>>,
90 pub admin_id: ChainId,
91}
92
93impl<Env: Environment> Clone for ValidatorUpdater<Env> {
94 fn clone(&self) -> Self {
95 ValidatorUpdater {
96 remote_node: self.remote_node.clone(),
97 client: self.client.clone(),
98 admin_id: self.admin_id,
99 }
100 }
101}
102
103#[derive(Error, Debug)]
105pub enum CommunicationError<E: fmt::Debug> {
106 #[error(
109 "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
110 )]
111 NoConsensus(u64, Vec<(u64, usize)>),
112 #[error("Failed to communicate with a quorum of validators: {0}")]
115 Trusted(E),
116 #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
119 Sample(Vec<(E, u64)>),
120}
121
122pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
129 validator_clients: &'a [RemoteNode<A>],
130 committee: &Committee,
131 group_by: G,
132 execute: F,
133 quorum_grace_period: f64,
135) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
136where
137 A: ValidatorNode + Clone + 'static,
138 F: Clone + Fn(RemoteNode<A>) -> R,
139 R: Future<Output = Result<V, chain_client::Error>> + 'a,
140 G: Fn(&V) -> K,
141 K: Hash + PartialEq + Eq + Clone + 'static,
142 V: 'static,
143{
144 let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
145 .iter()
146 .filter_map(|remote_node| {
147 if committee.weight(&remote_node.public_key) == 0 {
148 return None;
151 }
152 let execute = execute.clone();
153 let remote_node = remote_node.clone();
154 Some(async move { (remote_node.public_key, execute(remote_node).await) })
155 })
156 .collect();
157
158 let start_time = Instant::now();
159 let mut end_time: Option<Instant> = None;
160 let mut remaining_votes = committee.total_votes();
161 let mut highest_key_score = 0;
162 let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
163 let mut error_scores = HashMap::new();
164
165 'vote_wait: while let Ok(Some((name, result))) = timeout(
166 end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
167 responses.next(),
168 )
169 .await
170 {
171 remaining_votes -= committee.weight(&name);
172 match result {
173 Ok(value) => {
174 let key = group_by(&value);
175 let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
176 entry.0 += committee.weight(&name);
177 entry.1.push((name, value));
178 highest_key_score = highest_key_score.max(entry.0);
179 }
180 Err(err) => {
181 let err = match err {
183 chain_client::Error::RemoteNodeError(err) => err,
184 err => NodeError::ResponseHandlingError {
185 error: err.to_string(),
186 },
187 };
188 let entry = error_scores.entry(err.clone()).or_insert(0);
189 *entry += committee.weight(&name);
190 if *entry >= committee.validity_threshold() {
191 return Err(CommunicationError::Trusted(err));
194 }
195 }
196 }
197 if highest_key_score + remaining_votes < committee.quorum_threshold() {
199 break 'vote_wait;
200 }
201
202 if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
205 end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
206 }
207 }
208
209 let scores = value_scores
210 .values()
211 .map(|(weight, values)| (*weight, values.len()))
212 .collect();
213 if let Some((key, (_, values))) = value_scores
215 .into_iter()
216 .find(|(_, (score, _))| *score >= committee.quorum_threshold())
217 {
218 return Ok((key, values));
219 }
220
221 if error_scores.is_empty() {
222 return Err(CommunicationError::NoConsensus(
223 committee.quorum_threshold(),
224 scores,
225 ));
226 }
227
228 let mut sample = error_scores.into_iter().collect::<Vec<_>>();
230 sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
231 sample.truncate(4);
232 Err(CommunicationError::Sample(sample))
233}
234
235impl<Env> ValidatorUpdater<Env>
236where
237 Env: Environment + 'static,
238{
239 #[instrument(
240 level = "trace", skip_all, err(level = Level::WARN),
241 fields(chain_id = %certificate.block().header.chain_id)
242 )]
243 async fn send_confirmed_certificate(
244 &mut self,
245 certificate: GenericCertificate<ConfirmedBlock>,
246 delivery: CrossChainMessageDelivery,
247 ) -> Result<Box<ChainInfo>, chain_client::Error> {
248 let mut result = self
249 .remote_node
250 .handle_optimized_confirmed_certificate(&certificate, delivery)
251 .await;
252
253 let mut sent_admin_chain = false;
254 let mut sent_blobs = false;
255 loop {
256 match result {
257 Err(NodeError::EventsNotFound(event_ids))
258 if !sent_admin_chain
259 && certificate.inner().chain_id() != self.admin_id
260 && event_ids.iter().all(|event_id| {
261 event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
262 && event_id.chain_id == self.admin_id
263 }) =>
264 {
265 self.update_admin_chain().await?;
267 sent_admin_chain = true;
268 }
269 Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
270 self.remote_node
272 .check_blobs_not_found(&certificate, &blob_ids)?;
273 let maybe_blobs = self
275 .client
276 .local_node
277 .read_blobs_from_storage(&blob_ids)
278 .await?;
279 let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
280 self.remote_node.node.upload_blobs(blobs).await?;
281 sent_blobs = true;
282 }
283 result => return Ok(result?),
284 }
285 result = self
286 .remote_node
287 .handle_confirmed_certificate(certificate.clone(), delivery)
288 .await;
289 }
290 }
291
292 async fn send_validated_certificate(
293 &mut self,
294 certificate: GenericCertificate<ValidatedBlock>,
295 delivery: CrossChainMessageDelivery,
296 ) -> Result<Box<ChainInfo>, chain_client::Error> {
297 let result = self
298 .remote_node
299 .handle_optimized_validated_certificate(&certificate, delivery)
300 .await;
301
302 let chain_id = certificate.inner().chain_id();
303 match &result {
304 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
305 self.remote_node
306 .check_blobs_not_found(&certificate, blob_ids)?;
307 let blobs = self
310 .client
311 .local_node
312 .get_locking_blobs(blob_ids, chain_id)
313 .await?
314 .ok_or_else(|| original_err.clone())?;
315 self.remote_node.send_pending_blobs(chain_id, blobs).await?;
316 }
317 Err(error) => {
318 self.sync_if_needed(
319 chain_id,
320 certificate.round,
321 certificate.block().header.height,
322 error,
323 )
324 .await?;
325 }
326 _ => return Ok(result?),
327 }
328 Ok(self
329 .remote_node
330 .handle_validated_certificate(certificate)
331 .await?)
332 }
333
334 async fn request_timeout(
339 &mut self,
340 chain_id: ChainId,
341 round: Round,
342 height: BlockHeight,
343 ) -> Result<Box<ChainInfo>, chain_client::Error> {
344 let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
345 let result = self
346 .remote_node
347 .handle_chain_info_query(query.clone())
348 .await;
349 if let Err(err) = &result {
350 self.sync_if_needed(chain_id, round, height, err).await?;
351 }
352 Ok(result?)
353 }
354
355 async fn sync_if_needed(
357 &mut self,
358 chain_id: ChainId,
359 round: Round,
360 height: BlockHeight,
361 error: &NodeError,
362 ) -> Result<(), chain_client::Error> {
363 let address = &self.remote_node.address();
364 match error {
365 NodeError::WrongRound(validator_round) if *validator_round > round => {
366 tracing::debug!(
367 address, %chain_id, %validator_round, %round,
368 "validator is at a higher round; synchronizing",
369 );
370 self.client
371 .synchronize_chain_state_from(&self.remote_node, chain_id)
372 .await?;
373 }
374 NodeError::UnexpectedBlockHeight {
375 expected_block_height,
376 found_block_height,
377 } if expected_block_height > found_block_height => {
378 tracing::debug!(
379 address,
380 %chain_id,
381 %expected_block_height,
382 %found_block_height,
383 "validator is at a higher height; synchronizing",
384 );
385 self.client
386 .synchronize_chain_state_from(&self.remote_node, chain_id)
387 .await?;
388 }
389 NodeError::WrongRound(validator_round) if *validator_round < round => {
390 tracing::debug!(
391 address, %chain_id, %validator_round, %round,
392 "validator is at a lower round; sending chain info",
393 );
394 self.send_chain_information(
395 chain_id,
396 height,
397 CrossChainMessageDelivery::NonBlocking,
398 None,
399 )
400 .await?;
401 }
402 NodeError::UnexpectedBlockHeight {
403 expected_block_height,
404 found_block_height,
405 } if expected_block_height < found_block_height => {
406 tracing::debug!(
407 address,
408 %chain_id,
409 %expected_block_height,
410 %found_block_height,
411 "Validator is at a lower height; sending chain info.",
412 );
413 self.send_chain_information(
414 chain_id,
415 height,
416 CrossChainMessageDelivery::NonBlocking,
417 None,
418 )
419 .await?;
420 }
421 NodeError::InactiveChain(chain_id) => {
422 tracing::debug!(
423 address,
424 %chain_id,
425 "Validator has inactive chain; sending chain info.",
426 );
427 self.send_chain_information(
428 *chain_id,
429 height,
430 CrossChainMessageDelivery::NonBlocking,
431 None,
432 )
433 .await?;
434 }
435 _ => {}
436 }
437 Ok(())
438 }
439
440 async fn send_block_proposal(
441 &mut self,
442 proposal: Box<BlockProposal>,
443 mut blob_ids: Vec<BlobId>,
444 clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
445 ) -> Result<Box<ChainInfo>, chain_client::Error> {
446 let chain_id = proposal.content.block.chain_id;
447 let mut sent_cross_chain_updates = BTreeMap::new();
448 let mut publisher_chain_ids_sent = BTreeSet::new();
449 let storage = self.client.local_node.storage_client();
450 loop {
451 let local_time = storage.clock().current_time();
452 match self
453 .remote_node
454 .handle_block_proposal(proposal.clone())
455 .await
456 {
457 Ok(info) => return Ok(info),
458 Err(NodeError::WrongRound(_round)) => {
459 tracing::debug!(
462 remote_node = self.remote_node.address(),
463 %chain_id,
464 "wrong round; sending chain to validator",
465 );
466 self.send_chain_information(
467 chain_id,
468 proposal.content.block.height,
469 CrossChainMessageDelivery::NonBlocking,
470 None,
471 )
472 .await?;
473 }
474 Err(NodeError::UnexpectedBlockHeight {
475 expected_block_height,
476 found_block_height,
477 }) if expected_block_height < found_block_height
478 && found_block_height == proposal.content.block.height =>
479 {
480 tracing::debug!(
481 remote_node = self.remote_node.address(),
482 %chain_id,
483 "wrong height; sending chain to validator",
484 );
485 self.send_chain_information(
487 chain_id,
488 found_block_height,
489 CrossChainMessageDelivery::NonBlocking,
490 None,
491 )
492 .await?;
493 }
494 Err(NodeError::MissingCrossChainUpdate {
495 chain_id,
496 origin,
497 height,
498 }) if chain_id == proposal.content.block.chain_id
499 && sent_cross_chain_updates
500 .get(&origin)
501 .is_none_or(|h| *h < height) =>
502 {
503 tracing::debug!(
504 remote_node = %self.remote_node.address(),
505 chain_id = %origin,
506 "Missing cross-chain update; sending chain to validator.",
507 );
508 sent_cross_chain_updates.insert(origin, height);
509 self.send_chain_information(
513 origin,
514 height.try_add_one()?,
515 CrossChainMessageDelivery::Blocking,
516 None,
517 )
518 .await?;
519 }
520 Err(NodeError::EventsNotFound(event_ids)) => {
521 let mut publisher_heights = BTreeMap::new();
522 let chain_ids = event_ids
523 .iter()
524 .map(|event_id| event_id.chain_id)
525 .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
526 .collect::<BTreeSet<_>>();
527 tracing::debug!(
528 remote_node = self.remote_node.address(),
529 ?chain_ids,
530 "missing events; sending chains to validator",
531 );
532 ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
533 for chain_id in chain_ids {
534 let height = self
535 .client
536 .local_node
537 .get_next_height_to_preprocess(chain_id)
538 .await?;
539 publisher_heights.insert(chain_id, height);
540 publisher_chain_ids_sent.insert(chain_id);
541 }
542 self.send_chain_info_up_to_heights(
543 publisher_heights,
544 CrossChainMessageDelivery::NonBlocking,
545 )
546 .await?;
547 }
548 Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
549 if !blob_ids.is_empty() =>
550 {
551 tracing::debug!("Missing blobs");
552 let published_blob_ids =
556 BTreeSet::from_iter(proposal.content.block.published_blob_ids());
557 blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
558 let published_blobs = self
559 .client
560 .local_node
561 .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
562 .await?;
563 self.remote_node
564 .send_pending_blobs(chain_id, published_blobs)
565 .await?;
566 let missing_blob_ids = self
567 .remote_node
568 .node
569 .missing_blob_ids(mem::take(&mut blob_ids))
570 .await?;
571 let blob_states = self
572 .client
573 .local_node
574 .read_blob_states_from_storage(&missing_blob_ids)
575 .await?;
576 let mut chain_heights = BTreeMap::new();
577 for blob_state in blob_states {
578 let block_chain_id = blob_state.chain_id;
579 let block_height = blob_state.block_height.try_add_one()?;
580 chain_heights
581 .entry(block_chain_id)
582 .and_modify(|h| *h = block_height.max(*h))
583 .or_insert(block_height);
584 }
585 tracing::debug!("Sending chains {chain_heights:?}");
586
587 self.send_chain_info_up_to_heights(
588 chain_heights,
589 CrossChainMessageDelivery::NonBlocking,
590 )
591 .await?;
592 }
593 Err(NodeError::InvalidTimestamp {
594 block_timestamp,
595 local_time: validator_local_time,
596 ..
597 }) => {
598 let clock_skew = local_time.delta_since(validator_local_time);
605 tracing::debug!(
606 remote_node = self.remote_node.address(),
607 %chain_id,
608 %block_timestamp,
609 ?clock_skew,
610 "validator's clock is behind; waiting and retrying",
611 );
612 let _ = clock_skew_sender.send((self.remote_node.public_key, clock_skew));
614 storage
615 .clock()
616 .sleep_until(block_timestamp.saturating_add(clock_skew))
617 .await;
618 }
619 Err(err) => return Err(err.into()),
621 }
622 }
623 }
624
625 async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
626 let local_admin_info = self.client.local_node.chain_info(self.admin_id).await?;
627 Box::pin(self.send_chain_information(
628 self.admin_id,
629 local_admin_info.next_block_height,
630 CrossChainMessageDelivery::NonBlocking,
631 None,
632 ))
633 .await
634 }
635
636 pub async fn send_chain_information(
637 &mut self,
638 chain_id: ChainId,
639 target_block_height: BlockHeight,
640 delivery: CrossChainMessageDelivery,
641 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
642 ) -> Result<(), chain_client::Error> {
643 let info = if let Ok(height) = target_block_height.try_sub_one() {
644 let certificate = if let Some(cert) = latest_certificate {
647 cert
648 } else {
649 let hash = self
650 .client
651 .local_node
652 .get_block_hashes(chain_id, vec![height])
653 .await?
654 .into_iter()
655 .next()
656 .ok_or_else(|| {
657 chain_client::Error::InternalError(
658 "send_chain_information called with invalid target_block_height",
659 )
660 })?;
661 self.client
662 .local_node
663 .storage_client()
664 .read_certificate(hash)
665 .await?
666 .ok_or_else(|| chain_client::Error::MissingConfirmedBlock(hash))?
667 };
668 let info = match self.send_confirmed_certificate(certificate, delivery).await {
669 Ok(info) => info,
670 Err(error) => {
671 tracing::debug!(
672 address = self.remote_node.address(), %error,
673 "validator failed to handle confirmed certificate; sending whole chain",
674 );
675 let query = ChainInfoQuery::new(chain_id);
676 self.remote_node.handle_chain_info_query(query).await?
677 }
678 };
679 let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
681 .map(BlockHeight)
682 .collect();
683 let validator_missing_hashes = self
684 .client
685 .local_node
686 .get_block_hashes(chain_id, heights)
687 .await?;
688 if !validator_missing_hashes.is_empty() {
689 let certificates = self
691 .client
692 .local_node
693 .storage_client()
694 .read_certificates(validator_missing_hashes.clone())
695 .await?;
696 let certificates =
697 match ResultReadCertificates::new(certificates, validator_missing_hashes) {
698 ResultReadCertificates::Certificates(certificates) => certificates,
699 ResultReadCertificates::InvalidHashes(hashes) => {
700 return Err(chain_client::Error::ReadCertificatesError(hashes))
701 }
702 };
703 for certificate in certificates {
704 self.send_confirmed_certificate(certificate, delivery)
705 .await?;
706 }
707 }
708 info
709 } else {
710 let blob_states = self
712 .client
713 .local_node
714 .read_blob_states_from_storage(&[BlobId::new(
715 chain_id.0,
716 BlobType::ChainDescription,
717 )])
718 .await?;
719 let mut chain_heights = BTreeMap::new();
720 for blob_state in blob_states {
721 let block_chain_id = blob_state.chain_id;
722 let block_height = blob_state.block_height.try_add_one()?;
723 chain_heights
724 .entry(block_chain_id)
725 .and_modify(|h| *h = block_height.max(*h))
726 .or_insert(block_height);
727 }
728 self.send_chain_info_up_to_heights(
729 chain_heights,
730 CrossChainMessageDelivery::NonBlocking,
731 )
732 .await?;
733 let query = ChainInfoQuery::new(chain_id);
734 self.remote_node.handle_chain_info_query(query).await?
735 };
736 let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
737 let query = ChainInfoQuery::new(chain_id).with_manager_values();
738 let local_info = match self.client.local_node.handle_chain_info_query(query).await {
739 Ok(response) => response.info,
740 Err(LocalNodeError::BlobsNotFound(_)) => return Ok(()),
742 Err(error) => return Err(error.into()),
743 };
744 let manager = local_info.manager;
745 if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
746 return Ok(());
747 }
748 for proposal in manager
751 .requested_proposed
752 .into_iter()
753 .chain(manager.requested_signed_proposal)
754 {
755 if proposal.content.round == manager.current_round {
756 if let Err(error) = self.remote_node.handle_block_proposal(proposal).await {
757 tracing::info!(%error, "failed to send block proposal");
758 } else {
759 return Ok(());
760 }
761 }
762 }
763 if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.map(|b| *b) {
764 if validated.round == manager.current_round {
765 if let Err(error) = self
766 .remote_node
767 .handle_optimized_validated_certificate(
768 &validated,
769 CrossChainMessageDelivery::NonBlocking,
770 )
771 .await
772 {
773 tracing::info!(%error, "failed to send locking block");
774 } else {
775 return Ok(());
776 }
777 }
778 }
779 if let Some(cert) = manager.timeout {
780 if cert.round >= remote_round {
781 tracing::debug!(round = %cert.round, "sending timeout");
782 self.remote_node.handle_timeout_certificate(*cert).await?;
783 }
784 }
785 Ok(())
786 }
787
788 async fn send_chain_info_up_to_heights(
789 &mut self,
790 chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
791 delivery: CrossChainMessageDelivery,
792 ) -> Result<(), chain_client::Error> {
793 FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
794 let mut updater = self.clone();
795 async move {
796 updater
797 .send_chain_information(chain_id, height, delivery, None)
798 .await
799 }
800 }))
801 .try_collect::<Vec<_>>()
802 .await?;
803 Ok(())
804 }
805
806 pub async fn send_chain_update(
807 &mut self,
808 action: CommunicateAction,
809 ) -> Result<LiteVote, chain_client::Error> {
810 let chain_id = match &action {
811 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
812 CommunicateAction::FinalizeBlock { certificate, .. } => {
813 certificate.inner().block().header.chain_id
814 }
815 CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
816 };
817 let vote = match action {
819 CommunicateAction::SubmitBlock {
820 proposal,
821 blob_ids,
822 clock_skew_sender,
823 } => {
824 let info = self
825 .send_block_proposal(proposal, blob_ids, clock_skew_sender)
826 .await?;
827 info.manager.pending.ok_or_else(|| {
828 NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
829 })?
830 }
831 CommunicateAction::FinalizeBlock {
832 certificate,
833 delivery,
834 } => {
835 let info = self
836 .send_validated_certificate(*certificate, delivery)
837 .await?;
838 info.manager.pending.ok_or_else(|| {
839 NodeError::MissingVoteInValidatorResponse("finalize a block".into())
840 })?
841 }
842 CommunicateAction::RequestTimeout { round, height, .. } => {
843 let info = self.request_timeout(chain_id, round, height).await?;
844 info.manager.timeout_vote.ok_or_else(|| {
845 NodeError::MissingVoteInValidatorResponse("request a timeout".into())
846 })?
847 }
848 };
849 vote.check(self.remote_node.public_key)?;
850 Ok(vote)
851 }
852}