linera_core/
updater.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt,
8    hash::Hash,
9    mem,
10    sync::Arc,
11};
12
13use futures::{
14    stream::{FuturesUnordered, TryStreamExt},
15    Future, StreamExt,
16};
17use linera_base::{
18    crypto::ValidatorPublicKey,
19    data_types::{BlockHeight, Round},
20    ensure,
21    identifiers::{BlobId, BlobType, ChainId, StreamId},
22    time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25    data_types::{BlockProposal, LiteVote},
26    manager::LockingBlock,
27    types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{ResultReadCertificates, Storage};
31use thiserror::Error;
32use tracing::{instrument, Level};
33
34use crate::{
35    client::{chain_client, Client},
36    data_types::{ChainInfo, ChainInfoQuery},
37    environment::Environment,
38    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
39    remote_node::RemoteNode,
40    LocalNodeError,
41};
42
43/// The default amount of time we wait for additional validators to contribute
44/// to the result, as a fraction of how long it took to reach a quorum.
45pub const DEFAULT_GRACE_PERIOD: f64 = 0.2;
46/// The maximum timeout for requests to a stake-weighted quorum if no quorum is reached.
47const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); // 1 day.
48
49/// Used for `communicate_chain_action`
50#[derive(Clone)]
51pub enum CommunicateAction {
52    SubmitBlock {
53        proposal: Box<BlockProposal>,
54        blob_ids: Vec<BlobId>,
55    },
56    FinalizeBlock {
57        certificate: Box<ValidatedBlockCertificate>,
58        delivery: CrossChainMessageDelivery,
59    },
60    RequestTimeout {
61        chain_id: ChainId,
62        height: BlockHeight,
63        round: Round,
64    },
65}
66
67impl CommunicateAction {
68    /// The round to which this action pertains.
69    pub fn round(&self) -> Round {
70        match self {
71            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
72            CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
73            CommunicateAction::RequestTimeout { round, .. } => *round,
74        }
75    }
76}
77
78pub struct ValidatorUpdater<Env>
79where
80    Env: Environment,
81{
82    pub remote_node: RemoteNode<Env::ValidatorNode>,
83    pub client: Arc<Client<Env>>,
84    pub admin_id: ChainId,
85}
86
87impl<Env: Environment> Clone for ValidatorUpdater<Env> {
88    fn clone(&self) -> Self {
89        ValidatorUpdater {
90            remote_node: self.remote_node.clone(),
91            client: self.client.clone(),
92            admin_id: self.admin_id,
93        }
94    }
95}
96
97/// An error result for requests to a stake-weighted quorum.
98#[derive(Error, Debug)]
99pub enum CommunicationError<E: fmt::Debug> {
100    /// No consensus is possible since validators returned different possibilities
101    /// for the next block
102    #[error(
103        "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
104    )]
105    NoConsensus(u64, Vec<(u64, usize)>),
106    /// A single error that was returned by a sufficient number of nodes to be trusted as
107    /// valid.
108    #[error("Failed to communicate with a quorum of validators: {0}")]
109    Trusted(E),
110    /// No single error reached the validity threshold so we're returning a sample of
111    /// errors for debugging purposes, together with their weight.
112    #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
113    Sample(Vec<(E, u64)>),
114}
115
116/// Executes a sequence of actions in parallel for all validators.
117///
118/// Tries to stop early when a quorum is reached. If `grace_period` is specified, other validators
119/// are given additional time to contribute to the result. The grace period is calculated as a fraction
120/// (defaulting to `DEFAULT_GRACE_PERIOD`) of the time taken to reach quorum.
121pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
122    validator_clients: &'a [RemoteNode<A>],
123    committee: &Committee,
124    group_by: G,
125    execute: F,
126    // Grace period as a fraction of time taken to reach quorum
127    grace_period: f64,
128) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
129where
130    A: ValidatorNode + Clone + 'static,
131    F: Clone + Fn(RemoteNode<A>) -> R,
132    R: Future<Output = Result<V, chain_client::Error>> + 'a,
133    G: Fn(&V) -> K,
134    K: Hash + PartialEq + Eq + Clone + 'static,
135    V: 'static,
136{
137    let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
138        .iter()
139        .filter_map(|remote_node| {
140            if committee.weight(&remote_node.public_key) == 0 {
141                // This should not happen but better prevent it because certificates
142                // are not allowed to include votes with weight 0.
143                return None;
144            }
145            let execute = execute.clone();
146            let remote_node = remote_node.clone();
147            Some(async move { (remote_node.public_key, execute(remote_node).await) })
148        })
149        .collect();
150
151    let start_time = Instant::now();
152    let mut end_time: Option<Instant> = None;
153    let mut remaining_votes = committee.total_votes();
154    let mut highest_key_score = 0;
155    let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
156    let mut error_scores = HashMap::new();
157
158    'vote_wait: while let Ok(Some((name, result))) = timeout(
159        end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
160        responses.next(),
161    )
162    .await
163    {
164        remaining_votes -= committee.weight(&name);
165        match result {
166            Ok(value) => {
167                let key = group_by(&value);
168                let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
169                entry.0 += committee.weight(&name);
170                entry.1.push((name, value));
171                highest_key_score = highest_key_score.max(entry.0);
172            }
173            Err(err) => {
174                // TODO(#2857): Handle non-remote errors properly.
175                let err = match err {
176                    chain_client::Error::RemoteNodeError(err) => err,
177                    err => NodeError::ResponseHandlingError {
178                        error: err.to_string(),
179                    },
180                };
181                let entry = error_scores.entry(err.clone()).or_insert(0);
182                *entry += committee.weight(&name);
183                if *entry >= committee.validity_threshold() {
184                    // At least one honest node returned this error.
185                    // No quorum can be reached, so return early.
186                    return Err(CommunicationError::Trusted(err));
187                }
188            }
189        }
190        // If it becomes clear that no key can reach a quorum, break early.
191        if highest_key_score + remaining_votes < committee.quorum_threshold() {
192            break 'vote_wait;
193        }
194
195        // If a key reaches a quorum, wait for the grace period to collect more values
196        // or error information and then stop.
197        if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
198            end_time = Some(Instant::now() + start_time.elapsed().mul_f64(grace_period));
199        }
200    }
201
202    let scores = value_scores
203        .values()
204        .map(|(weight, values)| (*weight, values.len()))
205        .collect();
206    // If a key has a quorum, return it with its values.
207    if let Some((key, (_, values))) = value_scores
208        .into_iter()
209        .find(|(_, (score, _))| *score >= committee.quorum_threshold())
210    {
211        return Ok((key, values));
212    }
213
214    if error_scores.is_empty() {
215        return Err(CommunicationError::NoConsensus(
216            committee.quorum_threshold(),
217            scores,
218        ));
219    }
220
221    // No specific error is available to report reliably.
222    let mut sample = error_scores.into_iter().collect::<Vec<_>>();
223    sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
224    sample.truncate(4);
225    Err(CommunicationError::Sample(sample))
226}
227
228impl<Env> ValidatorUpdater<Env>
229where
230    Env: Environment + 'static,
231{
232    #[instrument(
233        level = "trace", skip_all, err(level = Level::WARN),
234        fields(chain_id = %certificate.block().header.chain_id)
235    )]
236    async fn send_confirmed_certificate(
237        &mut self,
238        certificate: GenericCertificate<ConfirmedBlock>,
239        delivery: CrossChainMessageDelivery,
240    ) -> Result<Box<ChainInfo>, chain_client::Error> {
241        let mut result = self
242            .remote_node
243            .handle_optimized_confirmed_certificate(&certificate, delivery)
244            .await;
245
246        let mut sent_admin_chain = false;
247        let mut sent_blobs = false;
248        loop {
249            match result {
250                Err(NodeError::EventsNotFound(event_ids))
251                    if !sent_admin_chain
252                        && certificate.inner().chain_id() != self.admin_id
253                        && event_ids.iter().all(|event_id| {
254                            event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
255                                && event_id.chain_id == self.admin_id
256                        }) =>
257                {
258                    // The validator doesn't have the committee that signed the certificate.
259                    self.update_admin_chain().await?;
260                    sent_admin_chain = true;
261                }
262                Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
263                    // The validator is missing the blobs required by the certificate.
264                    self.remote_node
265                        .check_blobs_not_found(&certificate, &blob_ids)?;
266                    // The certificate is confirmed, so the blobs must be in storage.
267                    let maybe_blobs = self
268                        .client
269                        .local_node
270                        .read_blobs_from_storage(&blob_ids)
271                        .await?;
272                    let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
273                    self.remote_node.node.upload_blobs(blobs).await?;
274                    sent_blobs = true;
275                }
276                result => return Ok(result?),
277            }
278            result = self
279                .remote_node
280                .handle_confirmed_certificate(certificate.clone(), delivery)
281                .await;
282        }
283    }
284
285    async fn send_validated_certificate(
286        &mut self,
287        certificate: GenericCertificate<ValidatedBlock>,
288        delivery: CrossChainMessageDelivery,
289    ) -> Result<Box<ChainInfo>, chain_client::Error> {
290        let result = self
291            .remote_node
292            .handle_optimized_validated_certificate(&certificate, delivery)
293            .await;
294
295        let chain_id = certificate.inner().chain_id();
296        match &result {
297            Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
298                self.remote_node
299                    .check_blobs_not_found(&certificate, blob_ids)?;
300                // The certificate is for a validated block, i.e. for our locking block.
301                // Take the missing blobs from our local chain manager.
302                let blobs = self
303                    .client
304                    .local_node
305                    .get_locking_blobs(blob_ids, chain_id)
306                    .await?
307                    .ok_or_else(|| original_err.clone())?;
308                self.remote_node.send_pending_blobs(chain_id, blobs).await?;
309            }
310            Err(error) => {
311                self.sync_if_needed(
312                    chain_id,
313                    certificate.round,
314                    certificate.block().header.height,
315                    error,
316                )
317                .await?;
318            }
319            _ => return Ok(result?),
320        }
321        Ok(self
322            .remote_node
323            .handle_validated_certificate(certificate)
324            .await?)
325    }
326
327    /// Requests a vote for a timeout certificate for the given round from the remote node.
328    ///
329    /// If the remote node is not in that round or at that height yet, sends the chain information
330    /// to update it.
331    async fn request_timeout(
332        &mut self,
333        chain_id: ChainId,
334        round: Round,
335        height: BlockHeight,
336    ) -> Result<Box<ChainInfo>, chain_client::Error> {
337        let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
338        let result = self
339            .remote_node
340            .handle_chain_info_query(query.clone())
341            .await;
342        if let Err(err) = &result {
343            self.sync_if_needed(chain_id, round, height, err).await?;
344        }
345        Ok(result?)
346    }
347
348    /// Synchronizes either the local node or the remote node, if one of them is lagging behind.
349    async fn sync_if_needed(
350        &mut self,
351        chain_id: ChainId,
352        round: Round,
353        height: BlockHeight,
354        error: &NodeError,
355    ) -> Result<(), chain_client::Error> {
356        let address = &self.remote_node.address();
357        match error {
358            NodeError::WrongRound(validator_round) if *validator_round > round => {
359                tracing::debug!(
360                    address, %chain_id, %validator_round, %round,
361                    "validator is at a higher round; synchronizing",
362                );
363                self.client
364                    .synchronize_chain_state_from(&self.remote_node, chain_id)
365                    .await?;
366            }
367            NodeError::UnexpectedBlockHeight {
368                expected_block_height,
369                found_block_height,
370            } if expected_block_height > found_block_height => {
371                tracing::debug!(
372                    address,
373                    %chain_id,
374                    %expected_block_height,
375                    %found_block_height,
376                    "validator is at a higher height; synchronizing",
377                );
378                self.client
379                    .synchronize_chain_state_from(&self.remote_node, chain_id)
380                    .await?;
381            }
382            NodeError::WrongRound(validator_round) if *validator_round < round => {
383                tracing::debug!(
384                    address, %chain_id, %validator_round, %round,
385                    "validator is at a lower round; sending chain info",
386                );
387                self.send_chain_information(
388                    chain_id,
389                    height,
390                    CrossChainMessageDelivery::NonBlocking,
391                    None,
392                )
393                .await?;
394            }
395            NodeError::UnexpectedBlockHeight {
396                expected_block_height,
397                found_block_height,
398            } if expected_block_height < found_block_height => {
399                tracing::debug!(
400                    address,
401                    %chain_id,
402                    %expected_block_height,
403                    %found_block_height,
404                    "Validator is at a lower height; sending chain info.",
405                );
406                self.send_chain_information(
407                    chain_id,
408                    height,
409                    CrossChainMessageDelivery::NonBlocking,
410                    None,
411                )
412                .await?;
413            }
414            NodeError::InactiveChain(chain_id) => {
415                tracing::debug!(
416                    address,
417                    %chain_id,
418                    "Validator has inactive chain; sending chain info.",
419                );
420                self.send_chain_information(
421                    *chain_id,
422                    height,
423                    CrossChainMessageDelivery::NonBlocking,
424                    None,
425                )
426                .await?;
427            }
428            _ => {}
429        }
430        Ok(())
431    }
432
433    async fn send_block_proposal(
434        &mut self,
435        proposal: Box<BlockProposal>,
436        mut blob_ids: Vec<BlobId>,
437    ) -> Result<Box<ChainInfo>, chain_client::Error> {
438        let chain_id = proposal.content.block.chain_id;
439        let mut sent_cross_chain_updates = BTreeMap::new();
440        let mut publisher_chain_ids_sent = BTreeSet::new();
441        loop {
442            match self
443                .remote_node
444                .handle_block_proposal(proposal.clone())
445                .await
446            {
447                Ok(info) => return Ok(info),
448                Err(NodeError::WrongRound(_round)) => {
449                    // The proposal is for a different round, so we need to update the validator.
450                    // TODO: this should probably be more specific as to which rounds are retried.
451                    tracing::debug!(
452                        remote_node = self.remote_node.address(),
453                        %chain_id,
454                        "wrong round; sending chain to validator",
455                    );
456                    self.send_chain_information(
457                        chain_id,
458                        proposal.content.block.height,
459                        CrossChainMessageDelivery::NonBlocking,
460                        None,
461                    )
462                    .await?;
463                }
464                Err(NodeError::UnexpectedBlockHeight {
465                    expected_block_height,
466                    found_block_height,
467                }) if expected_block_height < found_block_height
468                    && found_block_height == proposal.content.block.height =>
469                {
470                    tracing::debug!(
471                        remote_node = self.remote_node.address(),
472                        %chain_id,
473                        "wrong height; sending chain to validator",
474                    );
475                    // The proposal is for a later block height, so we need to update the validator.
476                    self.send_chain_information(
477                        chain_id,
478                        found_block_height,
479                        CrossChainMessageDelivery::NonBlocking,
480                        None,
481                    )
482                    .await?;
483                }
484                Err(NodeError::MissingCrossChainUpdate {
485                    chain_id,
486                    origin,
487                    height,
488                }) if chain_id == proposal.content.block.chain_id
489                    && sent_cross_chain_updates
490                        .get(&origin)
491                        .is_none_or(|h| *h < height) =>
492                {
493                    tracing::debug!(
494                        remote_node = %self.remote_node.address(),
495                        chain_id = %origin,
496                        "Missing cross-chain update; sending chain to validator.",
497                    );
498                    sent_cross_chain_updates.insert(origin, height);
499                    // Some received certificates may be missing for this validator
500                    // (e.g. to create the chain or make the balance sufficient) so we are going to
501                    // synchronize them now and retry.
502                    self.send_chain_information(
503                        origin,
504                        height.try_add_one()?,
505                        CrossChainMessageDelivery::Blocking,
506                        None,
507                    )
508                    .await?;
509                }
510                Err(NodeError::EventsNotFound(event_ids)) => {
511                    let mut publisher_heights = BTreeMap::new();
512                    let chain_ids = event_ids
513                        .iter()
514                        .map(|event_id| event_id.chain_id)
515                        .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
516                        .collect::<BTreeSet<_>>();
517                    tracing::debug!(
518                        remote_node = self.remote_node.address(),
519                        ?chain_ids,
520                        "missing events; sending chains to validator",
521                    );
522                    ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
523                    for chain_id in chain_ids {
524                        let height = self
525                            .client
526                            .local_node
527                            .chain_state_view(chain_id)
528                            .await?
529                            .next_height_to_preprocess()
530                            .await?;
531                        publisher_heights.insert(chain_id, height);
532                        publisher_chain_ids_sent.insert(chain_id);
533                    }
534                    self.send_chain_info_up_to_heights(
535                        publisher_heights,
536                        CrossChainMessageDelivery::NonBlocking,
537                    )
538                    .await?;
539                }
540                Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
541                    if !blob_ids.is_empty() =>
542                {
543                    tracing::debug!("Missing blobs");
544                    // For `BlobsNotFound`, we assume that the local node should already be
545                    // updated with the needed blobs, so sending the chain information about the
546                    // certificates that last used the blobs to the validator node should be enough.
547                    let published_blob_ids =
548                        BTreeSet::from_iter(proposal.content.block.published_blob_ids());
549                    blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
550                    let mut published_blobs = Vec::new();
551                    {
552                        let chain = self.client.local_node.chain_state_view(chain_id).await?;
553                        for blob_id in published_blob_ids {
554                            published_blobs
555                                .extend(chain.manager.proposed_blobs.get(&blob_id).await?);
556                        }
557                    }
558                    self.remote_node
559                        .send_pending_blobs(chain_id, published_blobs)
560                        .await?;
561                    let missing_blob_ids = self
562                        .remote_node
563                        .node
564                        .missing_blob_ids(mem::take(&mut blob_ids))
565                        .await?;
566                    let blob_states = self
567                        .client
568                        .local_node
569                        .read_blob_states_from_storage(&missing_blob_ids)
570                        .await?;
571                    let mut chain_heights = BTreeMap::new();
572                    for blob_state in blob_states {
573                        let block_chain_id = blob_state.chain_id;
574                        let block_height = blob_state.block_height.try_add_one()?;
575                        chain_heights
576                            .entry(block_chain_id)
577                            .and_modify(|h| *h = block_height.max(*h))
578                            .or_insert(block_height);
579                    }
580                    tracing::debug!("Sending chains {chain_heights:?}");
581
582                    self.send_chain_info_up_to_heights(
583                        chain_heights,
584                        CrossChainMessageDelivery::NonBlocking,
585                    )
586                    .await?;
587                }
588                // Fail immediately on other errors.
589                Err(err) => return Err(err.into()),
590            }
591        }
592    }
593
594    async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
595        let local_admin_info = self.client.local_node.chain_info(self.admin_id).await?;
596        Box::pin(self.send_chain_information(
597            self.admin_id,
598            local_admin_info.next_block_height,
599            CrossChainMessageDelivery::NonBlocking,
600            None,
601        ))
602        .await
603    }
604
605    pub async fn send_chain_information(
606        &mut self,
607        chain_id: ChainId,
608        target_block_height: BlockHeight,
609        delivery: CrossChainMessageDelivery,
610        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
611    ) -> Result<(), chain_client::Error> {
612        let info = if let Ok(height) = target_block_height.try_sub_one() {
613            // Figure out which certificates this validator is missing. In many cases, it's just the
614            // last one, so we optimistically send that one right away.
615            let certificate = if let Some(cert) = latest_certificate {
616                cert
617            } else {
618                let hash = self
619                    .client
620                    .local_node
621                    .chain_state_view(chain_id)
622                    .await?
623                    .block_hashes([height])
624                    .await?
625                    .into_iter()
626                    .next()
627                    .ok_or_else(|| {
628                        chain_client::Error::InternalError(
629                            "send_chain_information called with invalid target_block_height",
630                        )
631                    })?;
632                self.client
633                    .local_node
634                    .storage_client()
635                    .read_certificate(hash)
636                    .await?
637                    .ok_or_else(|| chain_client::Error::MissingConfirmedBlock(hash))?
638            };
639            let info = match self.send_confirmed_certificate(certificate, delivery).await {
640                Ok(info) => info,
641                Err(error) => {
642                    tracing::debug!(
643                        address = self.remote_node.address(), %error,
644                        "validator failed to handle confirmed certificate; sending whole chain",
645                    );
646                    let query = ChainInfoQuery::new(chain_id);
647                    self.remote_node.handle_chain_info_query(query).await?
648                }
649            };
650            // Obtain the missing blocks and the manager state from the local node.
651            let heights = (info.next_block_height.0..target_block_height.0).map(BlockHeight);
652            let validator_missing_hashes = self
653                .client
654                .local_node
655                .chain_state_view(chain_id)
656                .await?
657                .block_hashes(heights)
658                .await?;
659            if !validator_missing_hashes.is_empty() {
660                // Send the requested certificates in order.
661                let certificates = self
662                    .client
663                    .local_node
664                    .storage_client()
665                    .read_certificates(validator_missing_hashes.clone())
666                    .await?;
667                let certificates =
668                    match ResultReadCertificates::new(certificates, validator_missing_hashes) {
669                        ResultReadCertificates::Certificates(certificates) => certificates,
670                        ResultReadCertificates::InvalidHashes(hashes) => {
671                            return Err(chain_client::Error::ReadCertificatesError(hashes))
672                        }
673                    };
674                for certificate in certificates {
675                    self.send_confirmed_certificate(certificate, delivery)
676                        .await?;
677                }
678            }
679            info
680        } else {
681            // The remote node might not know about the chain yet.
682            let blob_states = self
683                .client
684                .local_node
685                .read_blob_states_from_storage(&[BlobId::new(
686                    chain_id.0,
687                    BlobType::ChainDescription,
688                )])
689                .await?;
690            let mut chain_heights = BTreeMap::new();
691            for blob_state in blob_states {
692                let block_chain_id = blob_state.chain_id;
693                let block_height = blob_state.block_height.try_add_one()?;
694                chain_heights
695                    .entry(block_chain_id)
696                    .and_modify(|h| *h = block_height.max(*h))
697                    .or_insert(block_height);
698            }
699            self.send_chain_info_up_to_heights(
700                chain_heights,
701                CrossChainMessageDelivery::NonBlocking,
702            )
703            .await?;
704            let query = ChainInfoQuery::new(chain_id);
705            self.remote_node.handle_chain_info_query(query).await?
706        };
707        let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
708        let query = ChainInfoQuery::new(chain_id).with_manager_values();
709        let local_info = match self.client.local_node.handle_chain_info_query(query).await {
710            Ok(response) => response.info,
711            // We don't have the full chain description.
712            Err(LocalNodeError::BlobsNotFound(_)) => return Ok(()),
713            Err(error) => return Err(error.into()),
714        };
715        let manager = local_info.manager;
716        if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
717            return Ok(());
718        }
719        // The remote node is at our height but not at the current round. Send it the proposal,
720        // validated block certificate or timeout certificate that proves the current round.
721        for proposal in manager
722            .requested_proposed
723            .into_iter()
724            .chain(manager.requested_signed_proposal)
725        {
726            if proposal.content.round == manager.current_round {
727                if let Err(error) = self.remote_node.handle_block_proposal(proposal).await {
728                    tracing::info!(%error, "failed to send block proposal");
729                } else {
730                    return Ok(());
731                }
732            }
733        }
734        if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.map(|b| *b) {
735            if validated.round == manager.current_round {
736                if let Err(error) = self
737                    .remote_node
738                    .handle_optimized_validated_certificate(
739                        &validated,
740                        CrossChainMessageDelivery::NonBlocking,
741                    )
742                    .await
743                {
744                    tracing::info!(%error, "failed to send locking block");
745                } else {
746                    return Ok(());
747                }
748            }
749        }
750        if let Some(cert) = manager.timeout {
751            if cert.round >= remote_round {
752                tracing::debug!(round = %cert.round, "sending timeout");
753                self.remote_node.handle_timeout_certificate(*cert).await?;
754            }
755        }
756        Ok(())
757    }
758
759    async fn send_chain_info_up_to_heights(
760        &mut self,
761        chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
762        delivery: CrossChainMessageDelivery,
763    ) -> Result<(), chain_client::Error> {
764        FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
765            let mut updater = self.clone();
766            async move {
767                updater
768                    .send_chain_information(chain_id, height, delivery, None)
769                    .await
770            }
771        }))
772        .try_collect::<Vec<_>>()
773        .await?;
774        Ok(())
775    }
776
777    pub async fn send_chain_update(
778        &mut self,
779        action: CommunicateAction,
780    ) -> Result<LiteVote, chain_client::Error> {
781        let chain_id = match &action {
782            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
783            CommunicateAction::FinalizeBlock { certificate, .. } => {
784                certificate.inner().block().header.chain_id
785            }
786            CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
787        };
788        // Send the block proposal, certificate or timeout request and return a vote.
789        let vote = match action {
790            CommunicateAction::SubmitBlock { proposal, blob_ids } => {
791                let info = self.send_block_proposal(proposal, blob_ids).await?;
792                info.manager.pending.ok_or_else(|| {
793                    NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
794                })?
795            }
796            CommunicateAction::FinalizeBlock {
797                certificate,
798                delivery,
799            } => {
800                let info = self
801                    .send_validated_certificate(*certificate, delivery)
802                    .await?;
803                info.manager.pending.ok_or_else(|| {
804                    NodeError::MissingVoteInValidatorResponse("finalize a block".into())
805                })?
806            }
807            CommunicateAction::RequestTimeout { round, height, .. } => {
808                let info = self.request_timeout(chain_id, round, height).await?;
809                info.manager.timeout_vote.ok_or_else(|| {
810                    NodeError::MissingVoteInValidatorResponse("request a timeout".into())
811                })?
812            }
813        };
814        vote.check(self.remote_node.public_key)?;
815        Ok(vote)
816    }
817}