linera_core/
updater.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt,
8    hash::Hash,
9    mem,
10    sync::Arc,
11};
12
13use futures::{
14    stream::{FuturesUnordered, TryStreamExt},
15    Future, StreamExt,
16};
17use linera_base::{
18    crypto::ValidatorPublicKey,
19    data_types::{BlockHeight, Round},
20    ensure,
21    identifiers::{BlobId, BlobType, ChainId, StreamId},
22    time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25    data_types::{BlockProposal, LiteVote},
26    manager::LockingBlock,
27    types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{ResultReadCertificates, Storage};
31use thiserror::Error;
32use tracing::{instrument, Level};
33
34use crate::{
35    client::{chain_client, Client},
36    data_types::{ChainInfo, ChainInfoQuery},
37    environment::Environment,
38    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
39    remote_node::RemoteNode,
40    LocalNodeError,
41};
42
43/// The default amount of time we wait for additional validators to contribute
44/// to the result, as a fraction of how long it took to reach a quorum.
45pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
46/// The maximum timeout for requests to a stake-weighted quorum if no quorum is reached.
47const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); // 1 day.
48
49/// Used for `communicate_chain_action`
50#[derive(Clone)]
51pub enum CommunicateAction {
52    SubmitBlock {
53        proposal: Box<BlockProposal>,
54        blob_ids: Vec<BlobId>,
55    },
56    FinalizeBlock {
57        certificate: Box<ValidatedBlockCertificate>,
58        delivery: CrossChainMessageDelivery,
59    },
60    RequestTimeout {
61        chain_id: ChainId,
62        height: BlockHeight,
63        round: Round,
64    },
65}
66
67impl CommunicateAction {
68    /// The round to which this action pertains.
69    pub fn round(&self) -> Round {
70        match self {
71            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
72            CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
73            CommunicateAction::RequestTimeout { round, .. } => *round,
74        }
75    }
76}
77
78pub struct ValidatorUpdater<Env>
79where
80    Env: Environment,
81{
82    pub remote_node: RemoteNode<Env::ValidatorNode>,
83    pub client: Arc<Client<Env>>,
84    pub admin_id: ChainId,
85}
86
87impl<Env: Environment> Clone for ValidatorUpdater<Env> {
88    fn clone(&self) -> Self {
89        ValidatorUpdater {
90            remote_node: self.remote_node.clone(),
91            client: self.client.clone(),
92            admin_id: self.admin_id,
93        }
94    }
95}
96
97/// An error result for requests to a stake-weighted quorum.
98#[derive(Error, Debug)]
99pub enum CommunicationError<E: fmt::Debug> {
100    /// No consensus is possible since validators returned different possibilities
101    /// for the next block
102    #[error(
103        "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
104    )]
105    NoConsensus(u64, Vec<(u64, usize)>),
106    /// A single error that was returned by a sufficient number of nodes to be trusted as
107    /// valid.
108    #[error("Failed to communicate with a quorum of validators: {0}")]
109    Trusted(E),
110    /// No single error reached the validity threshold so we're returning a sample of
111    /// errors for debugging purposes, together with their weight.
112    #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
113    Sample(Vec<(E, u64)>),
114}
115
116/// Executes a sequence of actions in parallel for all validators.
117///
118/// Tries to stop early when a quorum is reached. If `quorum_grace_period` is specified, other
119/// validators are given additional time to contribute to the result. The grace period is
120/// calculated as a fraction (defaulting to `DEFAULT_QUORUM_GRACE_PERIOD`) of the time taken to
121/// reach quorum.
122pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
123    validator_clients: &'a [RemoteNode<A>],
124    committee: &Committee,
125    group_by: G,
126    execute: F,
127    // Grace period as a fraction of time taken to reach quorum.
128    quorum_grace_period: f64,
129) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
130where
131    A: ValidatorNode + Clone + 'static,
132    F: Clone + Fn(RemoteNode<A>) -> R,
133    R: Future<Output = Result<V, chain_client::Error>> + 'a,
134    G: Fn(&V) -> K,
135    K: Hash + PartialEq + Eq + Clone + 'static,
136    V: 'static,
137{
138    let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
139        .iter()
140        .filter_map(|remote_node| {
141            if committee.weight(&remote_node.public_key) == 0 {
142                // This should not happen but better prevent it because certificates
143                // are not allowed to include votes with weight 0.
144                return None;
145            }
146            let execute = execute.clone();
147            let remote_node = remote_node.clone();
148            Some(async move { (remote_node.public_key, execute(remote_node).await) })
149        })
150        .collect();
151
152    let start_time = Instant::now();
153    let mut end_time: Option<Instant> = None;
154    let mut remaining_votes = committee.total_votes();
155    let mut highest_key_score = 0;
156    let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
157    let mut error_scores = HashMap::new();
158
159    'vote_wait: while let Ok(Some((name, result))) = timeout(
160        end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
161        responses.next(),
162    )
163    .await
164    {
165        remaining_votes -= committee.weight(&name);
166        match result {
167            Ok(value) => {
168                let key = group_by(&value);
169                let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
170                entry.0 += committee.weight(&name);
171                entry.1.push((name, value));
172                highest_key_score = highest_key_score.max(entry.0);
173            }
174            Err(err) => {
175                // TODO(#2857): Handle non-remote errors properly.
176                let err = match err {
177                    chain_client::Error::RemoteNodeError(err) => err,
178                    err => NodeError::ResponseHandlingError {
179                        error: err.to_string(),
180                    },
181                };
182                let entry = error_scores.entry(err.clone()).or_insert(0);
183                *entry += committee.weight(&name);
184                if *entry >= committee.validity_threshold() {
185                    // At least one honest node returned this error.
186                    // No quorum can be reached, so return early.
187                    return Err(CommunicationError::Trusted(err));
188                }
189            }
190        }
191        // If it becomes clear that no key can reach a quorum, break early.
192        if highest_key_score + remaining_votes < committee.quorum_threshold() {
193            break 'vote_wait;
194        }
195
196        // If a key reaches a quorum, wait for the grace period to collect more values
197        // or error information and then stop.
198        if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
199            end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
200        }
201    }
202
203    let scores = value_scores
204        .values()
205        .map(|(weight, values)| (*weight, values.len()))
206        .collect();
207    // If a key has a quorum, return it with its values.
208    if let Some((key, (_, values))) = value_scores
209        .into_iter()
210        .find(|(_, (score, _))| *score >= committee.quorum_threshold())
211    {
212        return Ok((key, values));
213    }
214
215    if error_scores.is_empty() {
216        return Err(CommunicationError::NoConsensus(
217            committee.quorum_threshold(),
218            scores,
219        ));
220    }
221
222    // No specific error is available to report reliably.
223    let mut sample = error_scores.into_iter().collect::<Vec<_>>();
224    sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
225    sample.truncate(4);
226    Err(CommunicationError::Sample(sample))
227}
228
229impl<Env> ValidatorUpdater<Env>
230where
231    Env: Environment + 'static,
232{
233    #[instrument(
234        level = "trace", skip_all, err(level = Level::WARN),
235        fields(chain_id = %certificate.block().header.chain_id)
236    )]
237    async fn send_confirmed_certificate(
238        &mut self,
239        certificate: GenericCertificate<ConfirmedBlock>,
240        delivery: CrossChainMessageDelivery,
241    ) -> Result<Box<ChainInfo>, chain_client::Error> {
242        let mut result = self
243            .remote_node
244            .handle_optimized_confirmed_certificate(&certificate, delivery)
245            .await;
246
247        let mut sent_admin_chain = false;
248        let mut sent_blobs = false;
249        loop {
250            match result {
251                Err(NodeError::EventsNotFound(event_ids))
252                    if !sent_admin_chain
253                        && certificate.inner().chain_id() != self.admin_id
254                        && event_ids.iter().all(|event_id| {
255                            event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
256                                && event_id.chain_id == self.admin_id
257                        }) =>
258                {
259                    // The validator doesn't have the committee that signed the certificate.
260                    self.update_admin_chain().await?;
261                    sent_admin_chain = true;
262                }
263                Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
264                    // The validator is missing the blobs required by the certificate.
265                    self.remote_node
266                        .check_blobs_not_found(&certificate, &blob_ids)?;
267                    // The certificate is confirmed, so the blobs must be in storage.
268                    let maybe_blobs = self
269                        .client
270                        .local_node
271                        .read_blobs_from_storage(&blob_ids)
272                        .await?;
273                    let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
274                    self.remote_node.node.upload_blobs(blobs).await?;
275                    sent_blobs = true;
276                }
277                result => return Ok(result?),
278            }
279            result = self
280                .remote_node
281                .handle_confirmed_certificate(certificate.clone(), delivery)
282                .await;
283        }
284    }
285
286    async fn send_validated_certificate(
287        &mut self,
288        certificate: GenericCertificate<ValidatedBlock>,
289        delivery: CrossChainMessageDelivery,
290    ) -> Result<Box<ChainInfo>, chain_client::Error> {
291        let result = self
292            .remote_node
293            .handle_optimized_validated_certificate(&certificate, delivery)
294            .await;
295
296        let chain_id = certificate.inner().chain_id();
297        match &result {
298            Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
299                self.remote_node
300                    .check_blobs_not_found(&certificate, blob_ids)?;
301                // The certificate is for a validated block, i.e. for our locking block.
302                // Take the missing blobs from our local chain manager.
303                let blobs = self
304                    .client
305                    .local_node
306                    .get_locking_blobs(blob_ids, chain_id)
307                    .await?
308                    .ok_or_else(|| original_err.clone())?;
309                self.remote_node.send_pending_blobs(chain_id, blobs).await?;
310            }
311            Err(error) => {
312                self.sync_if_needed(
313                    chain_id,
314                    certificate.round,
315                    certificate.block().header.height,
316                    error,
317                )
318                .await?;
319            }
320            _ => return Ok(result?),
321        }
322        Ok(self
323            .remote_node
324            .handle_validated_certificate(certificate)
325            .await?)
326    }
327
328    /// Requests a vote for a timeout certificate for the given round from the remote node.
329    ///
330    /// If the remote node is not in that round or at that height yet, sends the chain information
331    /// to update it.
332    async fn request_timeout(
333        &mut self,
334        chain_id: ChainId,
335        round: Round,
336        height: BlockHeight,
337    ) -> Result<Box<ChainInfo>, chain_client::Error> {
338        let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
339        let result = self
340            .remote_node
341            .handle_chain_info_query(query.clone())
342            .await;
343        if let Err(err) = &result {
344            self.sync_if_needed(chain_id, round, height, err).await?;
345        }
346        Ok(result?)
347    }
348
349    /// Synchronizes either the local node or the remote node, if one of them is lagging behind.
350    async fn sync_if_needed(
351        &mut self,
352        chain_id: ChainId,
353        round: Round,
354        height: BlockHeight,
355        error: &NodeError,
356    ) -> Result<(), chain_client::Error> {
357        let address = &self.remote_node.address();
358        match error {
359            NodeError::WrongRound(validator_round) if *validator_round > round => {
360                tracing::debug!(
361                    address, %chain_id, %validator_round, %round,
362                    "validator is at a higher round; synchronizing",
363                );
364                self.client
365                    .synchronize_chain_state_from(&self.remote_node, chain_id)
366                    .await?;
367            }
368            NodeError::UnexpectedBlockHeight {
369                expected_block_height,
370                found_block_height,
371            } if expected_block_height > found_block_height => {
372                tracing::debug!(
373                    address,
374                    %chain_id,
375                    %expected_block_height,
376                    %found_block_height,
377                    "validator is at a higher height; synchronizing",
378                );
379                self.client
380                    .synchronize_chain_state_from(&self.remote_node, chain_id)
381                    .await?;
382            }
383            NodeError::WrongRound(validator_round) if *validator_round < round => {
384                tracing::debug!(
385                    address, %chain_id, %validator_round, %round,
386                    "validator is at a lower round; sending chain info",
387                );
388                self.send_chain_information(
389                    chain_id,
390                    height,
391                    CrossChainMessageDelivery::NonBlocking,
392                    None,
393                )
394                .await?;
395            }
396            NodeError::UnexpectedBlockHeight {
397                expected_block_height,
398                found_block_height,
399            } if expected_block_height < found_block_height => {
400                tracing::debug!(
401                    address,
402                    %chain_id,
403                    %expected_block_height,
404                    %found_block_height,
405                    "Validator is at a lower height; sending chain info.",
406                );
407                self.send_chain_information(
408                    chain_id,
409                    height,
410                    CrossChainMessageDelivery::NonBlocking,
411                    None,
412                )
413                .await?;
414            }
415            NodeError::InactiveChain(chain_id) => {
416                tracing::debug!(
417                    address,
418                    %chain_id,
419                    "Validator has inactive chain; sending chain info.",
420                );
421                self.send_chain_information(
422                    *chain_id,
423                    height,
424                    CrossChainMessageDelivery::NonBlocking,
425                    None,
426                )
427                .await?;
428            }
429            _ => {}
430        }
431        Ok(())
432    }
433
434    async fn send_block_proposal(
435        &mut self,
436        proposal: Box<BlockProposal>,
437        mut blob_ids: Vec<BlobId>,
438    ) -> Result<Box<ChainInfo>, chain_client::Error> {
439        let chain_id = proposal.content.block.chain_id;
440        let mut sent_cross_chain_updates = BTreeMap::new();
441        let mut publisher_chain_ids_sent = BTreeSet::new();
442        loop {
443            match self
444                .remote_node
445                .handle_block_proposal(proposal.clone())
446                .await
447            {
448                Ok(info) => return Ok(info),
449                Err(NodeError::WrongRound(_round)) => {
450                    // The proposal is for a different round, so we need to update the validator.
451                    // TODO: this should probably be more specific as to which rounds are retried.
452                    tracing::debug!(
453                        remote_node = self.remote_node.address(),
454                        %chain_id,
455                        "wrong round; sending chain to validator",
456                    );
457                    self.send_chain_information(
458                        chain_id,
459                        proposal.content.block.height,
460                        CrossChainMessageDelivery::NonBlocking,
461                        None,
462                    )
463                    .await?;
464                }
465                Err(NodeError::UnexpectedBlockHeight {
466                    expected_block_height,
467                    found_block_height,
468                }) if expected_block_height < found_block_height
469                    && found_block_height == proposal.content.block.height =>
470                {
471                    tracing::debug!(
472                        remote_node = self.remote_node.address(),
473                        %chain_id,
474                        "wrong height; sending chain to validator",
475                    );
476                    // The proposal is for a later block height, so we need to update the validator.
477                    self.send_chain_information(
478                        chain_id,
479                        found_block_height,
480                        CrossChainMessageDelivery::NonBlocking,
481                        None,
482                    )
483                    .await?;
484                }
485                Err(NodeError::MissingCrossChainUpdate {
486                    chain_id,
487                    origin,
488                    height,
489                }) if chain_id == proposal.content.block.chain_id
490                    && sent_cross_chain_updates
491                        .get(&origin)
492                        .is_none_or(|h| *h < height) =>
493                {
494                    tracing::debug!(
495                        remote_node = %self.remote_node.address(),
496                        chain_id = %origin,
497                        "Missing cross-chain update; sending chain to validator.",
498                    );
499                    sent_cross_chain_updates.insert(origin, height);
500                    // Some received certificates may be missing for this validator
501                    // (e.g. to create the chain or make the balance sufficient) so we are going to
502                    // synchronize them now and retry.
503                    self.send_chain_information(
504                        origin,
505                        height.try_add_one()?,
506                        CrossChainMessageDelivery::Blocking,
507                        None,
508                    )
509                    .await?;
510                }
511                Err(NodeError::EventsNotFound(event_ids)) => {
512                    let mut publisher_heights = BTreeMap::new();
513                    let chain_ids = event_ids
514                        .iter()
515                        .map(|event_id| event_id.chain_id)
516                        .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
517                        .collect::<BTreeSet<_>>();
518                    tracing::debug!(
519                        remote_node = self.remote_node.address(),
520                        ?chain_ids,
521                        "missing events; sending chains to validator",
522                    );
523                    ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
524                    for chain_id in chain_ids {
525                        let height = self
526                            .client
527                            .local_node
528                            .get_next_height_to_preprocess(chain_id)
529                            .await?;
530                        publisher_heights.insert(chain_id, height);
531                        publisher_chain_ids_sent.insert(chain_id);
532                    }
533                    self.send_chain_info_up_to_heights(
534                        publisher_heights,
535                        CrossChainMessageDelivery::NonBlocking,
536                    )
537                    .await?;
538                }
539                Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
540                    if !blob_ids.is_empty() =>
541                {
542                    tracing::debug!("Missing blobs");
543                    // For `BlobsNotFound`, we assume that the local node should already be
544                    // updated with the needed blobs, so sending the chain information about the
545                    // certificates that last used the blobs to the validator node should be enough.
546                    let published_blob_ids =
547                        BTreeSet::from_iter(proposal.content.block.published_blob_ids());
548                    blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
549                    let published_blobs = self
550                        .client
551                        .local_node
552                        .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
553                        .await?;
554                    self.remote_node
555                        .send_pending_blobs(chain_id, published_blobs)
556                        .await?;
557                    let missing_blob_ids = self
558                        .remote_node
559                        .node
560                        .missing_blob_ids(mem::take(&mut blob_ids))
561                        .await?;
562                    let blob_states = self
563                        .client
564                        .local_node
565                        .read_blob_states_from_storage(&missing_blob_ids)
566                        .await?;
567                    let mut chain_heights = BTreeMap::new();
568                    for blob_state in blob_states {
569                        let block_chain_id = blob_state.chain_id;
570                        let block_height = blob_state.block_height.try_add_one()?;
571                        chain_heights
572                            .entry(block_chain_id)
573                            .and_modify(|h| *h = block_height.max(*h))
574                            .or_insert(block_height);
575                    }
576                    tracing::debug!("Sending chains {chain_heights:?}");
577
578                    self.send_chain_info_up_to_heights(
579                        chain_heights,
580                        CrossChainMessageDelivery::NonBlocking,
581                    )
582                    .await?;
583                }
584                // Fail immediately on other errors.
585                Err(err) => return Err(err.into()),
586            }
587        }
588    }
589
590    async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
591        let local_admin_info = self.client.local_node.chain_info(self.admin_id).await?;
592        Box::pin(self.send_chain_information(
593            self.admin_id,
594            local_admin_info.next_block_height,
595            CrossChainMessageDelivery::NonBlocking,
596            None,
597        ))
598        .await
599    }
600
601    pub async fn send_chain_information(
602        &mut self,
603        chain_id: ChainId,
604        target_block_height: BlockHeight,
605        delivery: CrossChainMessageDelivery,
606        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
607    ) -> Result<(), chain_client::Error> {
608        let info = if let Ok(height) = target_block_height.try_sub_one() {
609            // Figure out which certificates this validator is missing. In many cases, it's just the
610            // last one, so we optimistically send that one right away.
611            let certificate = if let Some(cert) = latest_certificate {
612                cert
613            } else {
614                let hash = self
615                    .client
616                    .local_node
617                    .get_block_hashes(chain_id, vec![height])
618                    .await?
619                    .into_iter()
620                    .next()
621                    .ok_or_else(|| {
622                        chain_client::Error::InternalError(
623                            "send_chain_information called with invalid target_block_height",
624                        )
625                    })?;
626                self.client
627                    .local_node
628                    .storage_client()
629                    .read_certificate(hash)
630                    .await?
631                    .ok_or_else(|| chain_client::Error::MissingConfirmedBlock(hash))?
632            };
633            let info = match self.send_confirmed_certificate(certificate, delivery).await {
634                Ok(info) => info,
635                Err(error) => {
636                    tracing::debug!(
637                        address = self.remote_node.address(), %error,
638                        "validator failed to handle confirmed certificate; sending whole chain",
639                    );
640                    let query = ChainInfoQuery::new(chain_id);
641                    self.remote_node.handle_chain_info_query(query).await?
642                }
643            };
644            // Obtain the missing blocks and the manager state from the local node.
645            let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
646                .map(BlockHeight)
647                .collect();
648            let validator_missing_hashes = self
649                .client
650                .local_node
651                .get_block_hashes(chain_id, heights)
652                .await?;
653            if !validator_missing_hashes.is_empty() {
654                // Send the requested certificates in order.
655                let certificates = self
656                    .client
657                    .local_node
658                    .storage_client()
659                    .read_certificates(validator_missing_hashes.clone())
660                    .await?;
661                let certificates =
662                    match ResultReadCertificates::new(certificates, validator_missing_hashes) {
663                        ResultReadCertificates::Certificates(certificates) => certificates,
664                        ResultReadCertificates::InvalidHashes(hashes) => {
665                            return Err(chain_client::Error::ReadCertificatesError(hashes))
666                        }
667                    };
668                for certificate in certificates {
669                    self.send_confirmed_certificate(certificate, delivery)
670                        .await?;
671                }
672            }
673            info
674        } else {
675            // The remote node might not know about the chain yet.
676            let blob_states = self
677                .client
678                .local_node
679                .read_blob_states_from_storage(&[BlobId::new(
680                    chain_id.0,
681                    BlobType::ChainDescription,
682                )])
683                .await?;
684            let mut chain_heights = BTreeMap::new();
685            for blob_state in blob_states {
686                let block_chain_id = blob_state.chain_id;
687                let block_height = blob_state.block_height.try_add_one()?;
688                chain_heights
689                    .entry(block_chain_id)
690                    .and_modify(|h| *h = block_height.max(*h))
691                    .or_insert(block_height);
692            }
693            self.send_chain_info_up_to_heights(
694                chain_heights,
695                CrossChainMessageDelivery::NonBlocking,
696            )
697            .await?;
698            let query = ChainInfoQuery::new(chain_id);
699            self.remote_node.handle_chain_info_query(query).await?
700        };
701        let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
702        let query = ChainInfoQuery::new(chain_id).with_manager_values();
703        let local_info = match self.client.local_node.handle_chain_info_query(query).await {
704            Ok(response) => response.info,
705            // We don't have the full chain description.
706            Err(LocalNodeError::BlobsNotFound(_)) => return Ok(()),
707            Err(error) => return Err(error.into()),
708        };
709        let manager = local_info.manager;
710        if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
711            return Ok(());
712        }
713        // The remote node is at our height but not at the current round. Send it the proposal,
714        // validated block certificate or timeout certificate that proves the current round.
715        for proposal in manager
716            .requested_proposed
717            .into_iter()
718            .chain(manager.requested_signed_proposal)
719        {
720            if proposal.content.round == manager.current_round {
721                if let Err(error) = self.remote_node.handle_block_proposal(proposal).await {
722                    tracing::info!(%error, "failed to send block proposal");
723                } else {
724                    return Ok(());
725                }
726            }
727        }
728        if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.map(|b| *b) {
729            if validated.round == manager.current_round {
730                if let Err(error) = self
731                    .remote_node
732                    .handle_optimized_validated_certificate(
733                        &validated,
734                        CrossChainMessageDelivery::NonBlocking,
735                    )
736                    .await
737                {
738                    tracing::info!(%error, "failed to send locking block");
739                } else {
740                    return Ok(());
741                }
742            }
743        }
744        if let Some(cert) = manager.timeout {
745            if cert.round >= remote_round {
746                tracing::debug!(round = %cert.round, "sending timeout");
747                self.remote_node.handle_timeout_certificate(*cert).await?;
748            }
749        }
750        Ok(())
751    }
752
753    async fn send_chain_info_up_to_heights(
754        &mut self,
755        chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
756        delivery: CrossChainMessageDelivery,
757    ) -> Result<(), chain_client::Error> {
758        FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
759            let mut updater = self.clone();
760            async move {
761                updater
762                    .send_chain_information(chain_id, height, delivery, None)
763                    .await
764            }
765        }))
766        .try_collect::<Vec<_>>()
767        .await?;
768        Ok(())
769    }
770
771    pub async fn send_chain_update(
772        &mut self,
773        action: CommunicateAction,
774    ) -> Result<LiteVote, chain_client::Error> {
775        let chain_id = match &action {
776            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
777            CommunicateAction::FinalizeBlock { certificate, .. } => {
778                certificate.inner().block().header.chain_id
779            }
780            CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
781        };
782        // Send the block proposal, certificate or timeout request and return a vote.
783        let vote = match action {
784            CommunicateAction::SubmitBlock { proposal, blob_ids } => {
785                let info = self.send_block_proposal(proposal, blob_ids).await?;
786                info.manager.pending.ok_or_else(|| {
787                    NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
788                })?
789            }
790            CommunicateAction::FinalizeBlock {
791                certificate,
792                delivery,
793            } => {
794                let info = self
795                    .send_validated_certificate(*certificate, delivery)
796                    .await?;
797                info.manager.pending.ok_or_else(|| {
798                    NodeError::MissingVoteInValidatorResponse("finalize a block".into())
799                })?
800            }
801            CommunicateAction::RequestTimeout { round, height, .. } => {
802                let info = self.request_timeout(chain_id, round, height).await?;
803                info.manager.timeout_vote.ok_or_else(|| {
804                    NodeError::MissingVoteInValidatorResponse("request a timeout".into())
805                })?
806            }
807        };
808        vote.check(self.remote_node.public_key)?;
809        Ok(vote)
810    }
811}