linera_core/
updater.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt,
8    hash::Hash,
9    mem,
10    sync::Arc,
11};
12
13use futures::{
14    stream::{FuturesUnordered, TryStreamExt},
15    Future, StreamExt,
16};
17use linera_base::{
18    crypto::ValidatorPublicKey,
19    data_types::{BlockHeight, Round, TimeDelta},
20    ensure,
21    identifiers::{BlobId, BlobType, ChainId, StreamId},
22    time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25    data_types::{BlockProposal, LiteVote},
26    manager::LockingBlock,
27    types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{Clock, ResultReadCertificates, Storage};
31use thiserror::Error;
32use tokio::sync::mpsc;
33use tracing::{instrument, Level};
34
35use crate::{
36    client::{chain_client, Client},
37    data_types::{ChainInfo, ChainInfoQuery},
38    environment::Environment,
39    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
40    remote_node::RemoteNode,
41    LocalNodeError,
42};
43
44/// The default amount of time we wait for additional validators to contribute
45/// to the result, as a fraction of how long it took to reach a quorum.
46pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
47
48/// A report of clock skew from a validator, sent before retrying due to `InvalidTimestamp`.
49pub type ClockSkewReport = (ValidatorPublicKey, TimeDelta);
50/// The maximum timeout for requests to a stake-weighted quorum if no quorum is reached.
51const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); // 1 day.
52
53/// Used for `communicate_chain_action`
54#[derive(Clone)]
55pub enum CommunicateAction {
56    SubmitBlock {
57        proposal: Box<BlockProposal>,
58        blob_ids: Vec<BlobId>,
59        /// Channel to report clock skew before sleeping, so the caller can aggregate reports.
60        clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
61    },
62    FinalizeBlock {
63        certificate: Box<ValidatedBlockCertificate>,
64        delivery: CrossChainMessageDelivery,
65    },
66    RequestTimeout {
67        chain_id: ChainId,
68        height: BlockHeight,
69        round: Round,
70    },
71}
72
73impl CommunicateAction {
74    /// The round to which this action pertains.
75    pub fn round(&self) -> Round {
76        match self {
77            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
78            CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
79            CommunicateAction::RequestTimeout { round, .. } => *round,
80        }
81    }
82}
83
84pub struct ValidatorUpdater<Env>
85where
86    Env: Environment,
87{
88    pub remote_node: RemoteNode<Env::ValidatorNode>,
89    pub client: Arc<Client<Env>>,
90    pub admin_id: ChainId,
91}
92
93impl<Env: Environment> Clone for ValidatorUpdater<Env> {
94    fn clone(&self) -> Self {
95        ValidatorUpdater {
96            remote_node: self.remote_node.clone(),
97            client: self.client.clone(),
98            admin_id: self.admin_id,
99        }
100    }
101}
102
103/// An error result for requests to a stake-weighted quorum.
104#[derive(Error, Debug)]
105pub enum CommunicationError<E: fmt::Debug> {
106    /// No consensus is possible since validators returned different possibilities
107    /// for the next block
108    #[error(
109        "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
110    )]
111    NoConsensus(u64, Vec<(u64, usize)>),
112    /// A single error that was returned by a sufficient number of nodes to be trusted as
113    /// valid.
114    #[error("Failed to communicate with a quorum of validators: {0}")]
115    Trusted(E),
116    /// No single error reached the validity threshold so we're returning a sample of
117    /// errors for debugging purposes, together with their weight.
118    #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
119    Sample(Vec<(E, u64)>),
120}
121
122/// Executes a sequence of actions in parallel for all validators.
123///
124/// Tries to stop early when a quorum is reached. If `quorum_grace_period` is specified, other
125/// validators are given additional time to contribute to the result. The grace period is
126/// calculated as a fraction (defaulting to `DEFAULT_QUORUM_GRACE_PERIOD`) of the time taken to
127/// reach quorum.
128pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
129    validator_clients: &'a [RemoteNode<A>],
130    committee: &Committee,
131    group_by: G,
132    execute: F,
133    // Grace period as a fraction of time taken to reach quorum.
134    quorum_grace_period: f64,
135) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
136where
137    A: ValidatorNode + Clone + 'static,
138    F: Clone + Fn(RemoteNode<A>) -> R,
139    R: Future<Output = Result<V, chain_client::Error>> + 'a,
140    G: Fn(&V) -> K,
141    K: Hash + PartialEq + Eq + Clone + 'static,
142    V: 'static,
143{
144    let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
145        .iter()
146        .filter_map(|remote_node| {
147            if committee.weight(&remote_node.public_key) == 0 {
148                // This should not happen but better prevent it because certificates
149                // are not allowed to include votes with weight 0.
150                return None;
151            }
152            let execute = execute.clone();
153            let remote_node = remote_node.clone();
154            Some(async move { (remote_node.public_key, execute(remote_node).await) })
155        })
156        .collect();
157
158    let start_time = Instant::now();
159    let mut end_time: Option<Instant> = None;
160    let mut remaining_votes = committee.total_votes();
161    let mut highest_key_score = 0;
162    let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
163    let mut error_scores = HashMap::new();
164
165    'vote_wait: while let Ok(Some((name, result))) = timeout(
166        end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
167        responses.next(),
168    )
169    .await
170    {
171        remaining_votes -= committee.weight(&name);
172        match result {
173            Ok(value) => {
174                let key = group_by(&value);
175                let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
176                entry.0 += committee.weight(&name);
177                entry.1.push((name, value));
178                highest_key_score = highest_key_score.max(entry.0);
179            }
180            Err(err) => {
181                // TODO(#2857): Handle non-remote errors properly.
182                let err = match err {
183                    chain_client::Error::RemoteNodeError(err) => err,
184                    err => NodeError::ResponseHandlingError {
185                        error: err.to_string(),
186                    },
187                };
188                let entry = error_scores.entry(err.clone()).or_insert(0);
189                *entry += committee.weight(&name);
190                if *entry >= committee.validity_threshold() {
191                    // At least one honest node returned this error.
192                    // No quorum can be reached, so return early.
193                    return Err(CommunicationError::Trusted(err));
194                }
195            }
196        }
197        // If it becomes clear that no key can reach a quorum, break early.
198        if highest_key_score + remaining_votes < committee.quorum_threshold() {
199            break 'vote_wait;
200        }
201
202        // If a key reaches a quorum, wait for the grace period to collect more values
203        // or error information and then stop.
204        if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
205            end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
206        }
207    }
208
209    let scores = value_scores
210        .values()
211        .map(|(weight, values)| (*weight, values.len()))
212        .collect();
213    // If a key has a quorum, return it with its values.
214    if let Some((key, (_, values))) = value_scores
215        .into_iter()
216        .find(|(_, (score, _))| *score >= committee.quorum_threshold())
217    {
218        return Ok((key, values));
219    }
220
221    if error_scores.is_empty() {
222        return Err(CommunicationError::NoConsensus(
223            committee.quorum_threshold(),
224            scores,
225        ));
226    }
227
228    // No specific error is available to report reliably.
229    let mut sample = error_scores.into_iter().collect::<Vec<_>>();
230    sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
231    sample.truncate(4);
232    Err(CommunicationError::Sample(sample))
233}
234
235impl<Env> ValidatorUpdater<Env>
236where
237    Env: Environment + 'static,
238{
239    #[instrument(
240        level = "trace", skip_all, err(level = Level::WARN),
241        fields(chain_id = %certificate.block().header.chain_id)
242    )]
243    async fn send_confirmed_certificate(
244        &mut self,
245        certificate: GenericCertificate<ConfirmedBlock>,
246        delivery: CrossChainMessageDelivery,
247    ) -> Result<Box<ChainInfo>, chain_client::Error> {
248        let mut result = self
249            .remote_node
250            .handle_optimized_confirmed_certificate(&certificate, delivery)
251            .await;
252
253        let mut sent_admin_chain = false;
254        let mut sent_blobs = false;
255        loop {
256            match result {
257                Err(NodeError::EventsNotFound(event_ids))
258                    if !sent_admin_chain
259                        && certificate.inner().chain_id() != self.admin_id
260                        && event_ids.iter().all(|event_id| {
261                            event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
262                                && event_id.chain_id == self.admin_id
263                        }) =>
264                {
265                    // The validator doesn't have the committee that signed the certificate.
266                    self.update_admin_chain().await?;
267                    sent_admin_chain = true;
268                }
269                Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
270                    // The validator is missing the blobs required by the certificate.
271                    self.remote_node
272                        .check_blobs_not_found(&certificate, &blob_ids)?;
273                    // The certificate is confirmed, so the blobs must be in storage.
274                    let maybe_blobs = self
275                        .client
276                        .local_node
277                        .read_blobs_from_storage(&blob_ids)
278                        .await?;
279                    let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
280                    self.remote_node.node.upload_blobs(blobs).await?;
281                    sent_blobs = true;
282                }
283                result => return Ok(result?),
284            }
285            result = self
286                .remote_node
287                .handle_confirmed_certificate(certificate.clone(), delivery)
288                .await;
289        }
290    }
291
292    async fn send_validated_certificate(
293        &mut self,
294        certificate: GenericCertificate<ValidatedBlock>,
295        delivery: CrossChainMessageDelivery,
296    ) -> Result<Box<ChainInfo>, chain_client::Error> {
297        let result = self
298            .remote_node
299            .handle_optimized_validated_certificate(&certificate, delivery)
300            .await;
301
302        let chain_id = certificate.inner().chain_id();
303        match &result {
304            Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
305                self.remote_node
306                    .check_blobs_not_found(&certificate, blob_ids)?;
307                // The certificate is for a validated block, i.e. for our locking block.
308                // Take the missing blobs from our local chain manager.
309                let blobs = self
310                    .client
311                    .local_node
312                    .get_locking_blobs(blob_ids, chain_id)
313                    .await?
314                    .ok_or_else(|| original_err.clone())?;
315                self.remote_node.send_pending_blobs(chain_id, blobs).await?;
316            }
317            Err(error) => {
318                self.sync_if_needed(
319                    chain_id,
320                    certificate.round,
321                    certificate.block().header.height,
322                    error,
323                )
324                .await?;
325            }
326            _ => return Ok(result?),
327        }
328        Ok(self
329            .remote_node
330            .handle_validated_certificate(certificate)
331            .await?)
332    }
333
334    /// Requests a vote for a timeout certificate for the given round from the remote node.
335    ///
336    /// If the remote node is not in that round or at that height yet, sends the chain information
337    /// to update it.
338    async fn request_timeout(
339        &mut self,
340        chain_id: ChainId,
341        round: Round,
342        height: BlockHeight,
343    ) -> Result<Box<ChainInfo>, chain_client::Error> {
344        let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
345        let result = self
346            .remote_node
347            .handle_chain_info_query(query.clone())
348            .await;
349        if let Err(err) = &result {
350            self.sync_if_needed(chain_id, round, height, err).await?;
351        }
352        Ok(result?)
353    }
354
355    /// Synchronizes either the local node or the remote node, if one of them is lagging behind.
356    async fn sync_if_needed(
357        &mut self,
358        chain_id: ChainId,
359        round: Round,
360        height: BlockHeight,
361        error: &NodeError,
362    ) -> Result<(), chain_client::Error> {
363        let address = &self.remote_node.address();
364        match error {
365            NodeError::WrongRound(validator_round) if *validator_round > round => {
366                tracing::debug!(
367                    address, %chain_id, %validator_round, %round,
368                    "validator is at a higher round; synchronizing",
369                );
370                self.client
371                    .synchronize_chain_state_from(&self.remote_node, chain_id)
372                    .await?;
373            }
374            NodeError::UnexpectedBlockHeight {
375                expected_block_height,
376                found_block_height,
377            } if expected_block_height > found_block_height => {
378                tracing::debug!(
379                    address,
380                    %chain_id,
381                    %expected_block_height,
382                    %found_block_height,
383                    "validator is at a higher height; synchronizing",
384                );
385                self.client
386                    .synchronize_chain_state_from(&self.remote_node, chain_id)
387                    .await?;
388            }
389            NodeError::WrongRound(validator_round) if *validator_round < round => {
390                tracing::debug!(
391                    address, %chain_id, %validator_round, %round,
392                    "validator is at a lower round; sending chain info",
393                );
394                self.send_chain_information(
395                    chain_id,
396                    height,
397                    CrossChainMessageDelivery::NonBlocking,
398                    None,
399                )
400                .await?;
401            }
402            NodeError::UnexpectedBlockHeight {
403                expected_block_height,
404                found_block_height,
405            } if expected_block_height < found_block_height => {
406                tracing::debug!(
407                    address,
408                    %chain_id,
409                    %expected_block_height,
410                    %found_block_height,
411                    "Validator is at a lower height; sending chain info.",
412                );
413                self.send_chain_information(
414                    chain_id,
415                    height,
416                    CrossChainMessageDelivery::NonBlocking,
417                    None,
418                )
419                .await?;
420            }
421            NodeError::InactiveChain(chain_id) => {
422                tracing::debug!(
423                    address,
424                    %chain_id,
425                    "Validator has inactive chain; sending chain info.",
426                );
427                self.send_chain_information(
428                    *chain_id,
429                    height,
430                    CrossChainMessageDelivery::NonBlocking,
431                    None,
432                )
433                .await?;
434            }
435            _ => {}
436        }
437        Ok(())
438    }
439
440    async fn send_block_proposal(
441        &mut self,
442        proposal: Box<BlockProposal>,
443        mut blob_ids: Vec<BlobId>,
444        clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
445    ) -> Result<Box<ChainInfo>, chain_client::Error> {
446        let chain_id = proposal.content.block.chain_id;
447        let mut sent_cross_chain_updates = BTreeMap::new();
448        let mut publisher_chain_ids_sent = BTreeSet::new();
449        let storage = self.client.local_node.storage_client();
450        loop {
451            let local_time = storage.clock().current_time();
452            match self
453                .remote_node
454                .handle_block_proposal(proposal.clone())
455                .await
456            {
457                Ok(info) => return Ok(info),
458                Err(NodeError::WrongRound(_round)) => {
459                    // The proposal is for a different round, so we need to update the validator.
460                    // TODO: this should probably be more specific as to which rounds are retried.
461                    tracing::debug!(
462                        remote_node = self.remote_node.address(),
463                        %chain_id,
464                        "wrong round; sending chain to validator",
465                    );
466                    self.send_chain_information(
467                        chain_id,
468                        proposal.content.block.height,
469                        CrossChainMessageDelivery::NonBlocking,
470                        None,
471                    )
472                    .await?;
473                }
474                Err(NodeError::UnexpectedBlockHeight {
475                    expected_block_height,
476                    found_block_height,
477                }) if expected_block_height < found_block_height
478                    && found_block_height == proposal.content.block.height =>
479                {
480                    tracing::debug!(
481                        remote_node = self.remote_node.address(),
482                        %chain_id,
483                        "wrong height; sending chain to validator",
484                    );
485                    // The proposal is for a later block height, so we need to update the validator.
486                    self.send_chain_information(
487                        chain_id,
488                        found_block_height,
489                        CrossChainMessageDelivery::NonBlocking,
490                        None,
491                    )
492                    .await?;
493                }
494                Err(NodeError::MissingCrossChainUpdate {
495                    chain_id,
496                    origin,
497                    height,
498                }) if chain_id == proposal.content.block.chain_id
499                    && sent_cross_chain_updates
500                        .get(&origin)
501                        .is_none_or(|h| *h < height) =>
502                {
503                    tracing::debug!(
504                        remote_node = %self.remote_node.address(),
505                        chain_id = %origin,
506                        "Missing cross-chain update; sending chain to validator.",
507                    );
508                    sent_cross_chain_updates.insert(origin, height);
509                    // Some received certificates may be missing for this validator
510                    // (e.g. to create the chain or make the balance sufficient) so we are going to
511                    // synchronize them now and retry.
512                    self.send_chain_information(
513                        origin,
514                        height.try_add_one()?,
515                        CrossChainMessageDelivery::Blocking,
516                        None,
517                    )
518                    .await?;
519                }
520                Err(NodeError::EventsNotFound(event_ids)) => {
521                    let mut publisher_heights = BTreeMap::new();
522                    let chain_ids = event_ids
523                        .iter()
524                        .map(|event_id| event_id.chain_id)
525                        .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
526                        .collect::<BTreeSet<_>>();
527                    tracing::debug!(
528                        remote_node = self.remote_node.address(),
529                        ?chain_ids,
530                        "missing events; sending chains to validator",
531                    );
532                    ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
533                    for chain_id in chain_ids {
534                        let height = self
535                            .client
536                            .local_node
537                            .get_next_height_to_preprocess(chain_id)
538                            .await?;
539                        publisher_heights.insert(chain_id, height);
540                        publisher_chain_ids_sent.insert(chain_id);
541                    }
542                    self.send_chain_info_up_to_heights(
543                        publisher_heights,
544                        CrossChainMessageDelivery::NonBlocking,
545                    )
546                    .await?;
547                }
548                Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
549                    if !blob_ids.is_empty() =>
550                {
551                    tracing::debug!("Missing blobs");
552                    // For `BlobsNotFound`, we assume that the local node should already be
553                    // updated with the needed blobs, so sending the chain information about the
554                    // certificates that last used the blobs to the validator node should be enough.
555                    let published_blob_ids =
556                        BTreeSet::from_iter(proposal.content.block.published_blob_ids());
557                    blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
558                    let published_blobs = self
559                        .client
560                        .local_node
561                        .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
562                        .await?;
563                    self.remote_node
564                        .send_pending_blobs(chain_id, published_blobs)
565                        .await?;
566                    let missing_blob_ids = self
567                        .remote_node
568                        .node
569                        .missing_blob_ids(mem::take(&mut blob_ids))
570                        .await?;
571
572                    tracing::debug!("Sending chains for missing blobs");
573                    self.send_chain_info_for_blobs(
574                        &missing_blob_ids,
575                        CrossChainMessageDelivery::NonBlocking,
576                    )
577                    .await?;
578                }
579                Err(NodeError::InvalidTimestamp {
580                    block_timestamp,
581                    local_time: validator_local_time,
582                    ..
583                }) => {
584                    // The validator's clock is behind the block's timestamp. We need to
585                    // wait for two things:
586                    // 1. Our clock to reach block_timestamp (in case the block timestamp
587                    //    is in the future from our perspective too).
588                    // 2. The validator's clock to catch up (in case of clock skew between
589                    //    us and the validator).
590                    let clock_skew = local_time.delta_since(validator_local_time);
591                    tracing::debug!(
592                        remote_node = self.remote_node.address(),
593                        %chain_id,
594                        %block_timestamp,
595                        ?clock_skew,
596                        "validator's clock is behind; waiting and retrying",
597                    );
598                    // Report the clock skew before sleeping so the caller can aggregate.
599                    // Receiver may have been dropped if the caller is no longer interested.
600                    clock_skew_sender
601                        .send((self.remote_node.public_key, clock_skew))
602                        .ok();
603                    storage
604                        .clock()
605                        .sleep_until(block_timestamp.saturating_add(clock_skew))
606                        .await;
607                }
608                // Fail immediately on other errors.
609                Err(err) => return Err(err.into()),
610            }
611        }
612    }
613
614    async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
615        let local_admin_info = self.client.local_node.chain_info(self.admin_id).await?;
616        Box::pin(self.send_chain_information(
617            self.admin_id,
618            local_admin_info.next_block_height,
619            CrossChainMessageDelivery::NonBlocking,
620            None,
621        ))
622        .await
623    }
624
625    /// Sends chain information to bring a validator up to date with a specific chain.
626    ///
627    /// This method performs a two-phase synchronization:
628    /// 1. **Height synchronization**: Ensures the validator has all blocks up to `target_block_height`.
629    /// 2. **Round synchronization**: If heights match, ensures the validator has proposals/certificates
630    ///    for the current consensus round.
631    ///
632    /// # Height Sync Strategy
633    /// - For existing chains (target_block_height > 0):
634    ///   * Optimistically sends the last certificate first (often that's all that's missing).
635    ///   * Falls back to full chain query if the validator needs more context.
636    ///   * Sends any additional missing certificates in order.
637    /// - For new chains (target_block_height == 0):
638    ///   * Sends the chain description and dependencies first.
639    ///   * Then queries the validator's state.
640    ///
641    /// # Round Sync Strategy
642    /// Once heights match, if the local node is at a higher round, sends the evidence
643    /// (proposal, validated block, or timeout certificate) that proves the current round.
644    ///
645    /// # Parameters
646    /// - `chain_id`: The chain to synchronize
647    /// - `target_block_height`: The height the validator should reach
648    /// - `delivery`: Message delivery mode (blocking or non-blocking)
649    /// - `latest_certificate`: Optional certificate at target_block_height - 1 to avoid a storage lookup
650    ///
651    /// # Returns
652    /// - `Ok(())` if synchronization completed successfully or the validator is already up to date
653    /// - `Err` if there was a communication or storage error
654    #[instrument(level = "trace", skip_all)]
655    pub async fn send_chain_information(
656        &mut self,
657        chain_id: ChainId,
658        target_block_height: BlockHeight,
659        delivery: CrossChainMessageDelivery,
660        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
661    ) -> Result<(), chain_client::Error> {
662        // Phase 1: Height synchronization
663        let info = if target_block_height.0 > 0 {
664            self.sync_chain_height(chain_id, target_block_height, delivery, latest_certificate)
665                .await?
666        } else {
667            self.initialize_new_chain_on_validator(chain_id).await?
668        };
669
670        // Phase 2: Round synchronization (if needed)
671        // Height synchronization is complete. Now check if we need to synchronize
672        // the consensus round at this height.
673        let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
674        let query = ChainInfoQuery::new(chain_id).with_manager_values();
675        let local_info = match self.client.local_node.handle_chain_info_query(query).await {
676            Ok(response) => response.info,
677            // If we don't have the full chain description locally, we can't help the
678            // validator with round synchronization. This is not an error - the validator
679            // should retry later once the chain is fully initialized locally.
680            Err(LocalNodeError::BlobsNotFound(_)) => {
681                tracing::debug!("local chain description not fully available, skipping round sync");
682                return Ok(());
683            }
684            Err(error) => return Err(error.into()),
685        };
686
687        let manager = local_info.manager;
688        if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
689            return Ok(());
690        }
691
692        // Validator is at our height but behind on consensus round
693        self.sync_consensus_round(remote_round, &manager).await
694    }
695
696    /// Synchronizes a validator to a specific block height by sending missing certificates.
697    ///
698    /// Uses an optimistic approach: sends the last certificate first, then fills in any gaps.
699    ///
700    /// Returns the [`ChainInfo`] from the validator after synchronization.
701    async fn sync_chain_height(
702        &mut self,
703        chain_id: ChainId,
704        target_block_height: BlockHeight,
705        delivery: CrossChainMessageDelivery,
706        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
707    ) -> Result<Box<ChainInfo>, chain_client::Error> {
708        let height = target_block_height.try_sub_one()?;
709
710        // Get the certificate for the last block we want to send
711        let certificate = if let Some(cert) = latest_certificate {
712            cert
713        } else {
714            let hash = self
715                .client
716                .local_node
717                .get_block_hashes(chain_id, vec![height])
718                .await?
719                .into_iter()
720                .next()
721                .ok_or_else(|| {
722                    chain_client::Error::InternalError(
723                        "send_chain_information called with invalid target_block_height",
724                    )
725                })?;
726            self.client
727                .local_node
728                .storage_client()
729                .read_certificate(hash)
730                .await?
731                .ok_or_else(|| chain_client::Error::MissingConfirmedBlock(hash))?
732        };
733
734        // Optimistically try sending just the last certificate
735        let info = match self.send_confirmed_certificate(certificate, delivery).await {
736            Ok(info) => info,
737            Err(error) => {
738                tracing::debug!(
739                    address = self.remote_node.address(), %error,
740                    "validator failed to handle confirmed certificate; sending whole chain",
741                );
742                let query = ChainInfoQuery::new(chain_id);
743                self.remote_node.handle_chain_info_query(query).await?
744            }
745        };
746
747        // Calculate which block heights the validator is still missing
748        let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
749            .map(BlockHeight)
750            .collect();
751
752        if heights.is_empty() {
753            return Ok(info);
754        }
755
756        // Send any additional missing certificates in order
757        let validator_missing_hashes = self
758            .client
759            .local_node
760            .get_block_hashes(chain_id, heights)
761            .await?;
762
763        let certificates = self
764            .client
765            .local_node
766            .storage_client()
767            .read_certificates(validator_missing_hashes.clone())
768            .await?;
769        let certificates = match ResultReadCertificates::new(certificates, validator_missing_hashes)
770        {
771            ResultReadCertificates::Certificates(certificates) => certificates,
772            ResultReadCertificates::InvalidHashes(hashes) => {
773                return Err(chain_client::Error::ReadCertificatesError(hashes))
774            }
775        };
776        for certificate in certificates {
777            self.send_confirmed_certificate(certificate, delivery)
778                .await?;
779        }
780
781        Ok(info)
782    }
783
784    /// Initializes a new chain on the validator by sending the chain description and dependencies.
785    ///
786    /// This is called when the validator doesn't know about the chain yet.
787    ///
788    /// Returns the [`ChainInfo`] from the validator after initialization.
789    async fn initialize_new_chain_on_validator(
790        &mut self,
791        chain_id: ChainId,
792    ) -> Result<Box<ChainInfo>, chain_client::Error> {
793        // Send chain description and all dependency chains
794        self.send_chain_info_for_blobs(
795            &[BlobId::new(chain_id.0, BlobType::ChainDescription)],
796            CrossChainMessageDelivery::NonBlocking,
797        )
798        .await?;
799
800        // Query the validator's state for this chain
801        let query = ChainInfoQuery::new(chain_id);
802        let info = self.remote_node.handle_chain_info_query(query).await?;
803        Ok(info)
804    }
805
806    /// Synchronizes the consensus round state with the validator.
807    ///
808    /// If the validator is at the same height but an earlier round, sends the evidence
809    /// (proposal, validated block, or timeout certificate) that justifies the current round.
810    ///
811    /// This is a best-effort operation - failures are logged but don't fail the entire sync.
812    async fn sync_consensus_round(
813        &mut self,
814        remote_round: Round,
815        manager: &linera_chain::manager::ChainManagerInfo,
816    ) -> Result<(), chain_client::Error> {
817        // Try to send a proposal for the current round
818        for proposal in manager
819            .requested_proposed
820            .iter()
821            .chain(manager.requested_signed_proposal.iter())
822        {
823            if proposal.content.round == manager.current_round {
824                match self
825                    .remote_node
826                    .handle_block_proposal(proposal.clone())
827                    .await
828                {
829                    Ok(_) => {
830                        tracing::debug!("successfully sent block proposal for round sync");
831                        return Ok(());
832                    }
833                    Err(error) => {
834                        tracing::debug!(%error, "failed to send block proposal");
835                    }
836                }
837            }
838        }
839
840        // Try to send a validated block for the current round
841        if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.as_deref() {
842            if validated.round == manager.current_round {
843                match self
844                    .remote_node
845                    .handle_optimized_validated_certificate(
846                        validated,
847                        CrossChainMessageDelivery::NonBlocking,
848                    )
849                    .await
850                {
851                    Ok(_) => {
852                        tracing::debug!("successfully sent validated block for round sync");
853                        return Ok(());
854                    }
855                    Err(error) => {
856                        tracing::debug!(%error, "failed to send validated block");
857                    }
858                }
859            }
860        }
861
862        // Try to send a timeout certificate
863        if let Some(cert) = &manager.timeout {
864            if cert.round >= remote_round {
865                match self
866                    .remote_node
867                    .handle_timeout_certificate(cert.as_ref().clone())
868                    .await
869                {
870                    Ok(_) => {
871                        tracing::debug!(round = %cert.round, "successfully sent timeout certificate");
872                        return Ok(());
873                    }
874                    Err(error) => {
875                        tracing::debug!(%error, round = %cert.round, "failed to send timeout certificate");
876                    }
877                }
878            }
879        }
880
881        // If we reach here, either we had no round sync data to send, or all attempts failed.
882        // This is not a fatal error - height sync succeeded which is the primary goal.
883        tracing::debug!("round sync not performed: no applicable data or all attempts failed");
884        Ok(())
885    }
886
887    /// Sends chain information for all chains referenced by the given blobs.
888    ///
889    /// Reads blob states from storage, determines the specific chain heights needed,
890    /// and sends chain information for those heights. With sparse chains, this only
891    /// sends the specific blocks containing the blobs, not all blocks up to those heights.
892    async fn send_chain_info_for_blobs(
893        &mut self,
894        blob_ids: &[BlobId],
895        delivery: CrossChainMessageDelivery,
896    ) -> Result<(), chain_client::Error> {
897        let blob_states = self
898            .client
899            .local_node
900            .read_blob_states_from_storage(blob_ids)
901            .await?;
902
903        let mut chain_heights: BTreeMap<ChainId, BTreeSet<BlockHeight>> = BTreeMap::new();
904        for blob_state in blob_states {
905            let block_chain_id = blob_state.chain_id;
906            let block_height = blob_state.block_height;
907            chain_heights
908                .entry(block_chain_id)
909                .or_default()
910                .insert(block_height);
911        }
912
913        self.send_chain_info_at_heights(chain_heights, delivery)
914            .await
915    }
916
917    /// Sends chain information for specific heights on multiple chains.
918    ///
919    /// Unlike `send_chain_info_up_to_heights`, this method only sends the blocks at the
920    /// specified heights, not all blocks up to those heights. This is more efficient for
921    /// sparse chains where only specific blocks are needed.
922    async fn send_chain_info_at_heights(
923        &mut self,
924        chain_heights: impl IntoIterator<Item = (ChainId, BTreeSet<BlockHeight>)>,
925        delivery: CrossChainMessageDelivery,
926    ) -> Result<(), chain_client::Error> {
927        FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, heights)| {
928            let mut updater = self.clone();
929            async move {
930                // Get all block hashes for this chain at the specified heights in one call
931                let heights_vec: Vec<_> = heights.into_iter().collect();
932                let hashes = updater
933                    .client
934                    .local_node
935                    .get_block_hashes(chain_id, heights_vec.clone())
936                    .await?;
937
938                if hashes.len() != heights_vec.len() {
939                    return Err(chain_client::Error::InternalError(
940                        "send_chain_info_at_heights called with invalid heights",
941                    ));
942                }
943
944                // Read all certificates in one call
945                let certificates = updater
946                    .client
947                    .local_node
948                    .storage_client()
949                    .read_certificates(hashes.clone())
950                    .await?;
951
952                // Send each certificate
953                for (hash, certificate) in hashes.into_iter().zip(certificates) {
954                    let certificate = certificate
955                        .ok_or_else(|| chain_client::Error::MissingConfirmedBlock(hash))?;
956                    updater
957                        .send_confirmed_certificate(certificate, delivery)
958                        .await?;
959                }
960
961                Ok::<_, chain_client::Error>(())
962            }
963        }))
964        .try_collect::<Vec<_>>()
965        .await?;
966        Ok(())
967    }
968
969    async fn send_chain_info_up_to_heights(
970        &mut self,
971        chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
972        delivery: CrossChainMessageDelivery,
973    ) -> Result<(), chain_client::Error> {
974        FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
975            let mut updater = self.clone();
976            async move {
977                updater
978                    .send_chain_information(chain_id, height, delivery, None)
979                    .await
980            }
981        }))
982        .try_collect::<Vec<_>>()
983        .await?;
984        Ok(())
985    }
986
987    pub async fn send_chain_update(
988        &mut self,
989        action: CommunicateAction,
990    ) -> Result<LiteVote, chain_client::Error> {
991        let chain_id = match &action {
992            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
993            CommunicateAction::FinalizeBlock { certificate, .. } => {
994                certificate.inner().block().header.chain_id
995            }
996            CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
997        };
998        // Send the block proposal, certificate or timeout request and return a vote.
999        let vote = match action {
1000            CommunicateAction::SubmitBlock {
1001                proposal,
1002                blob_ids,
1003                clock_skew_sender,
1004            } => {
1005                let info = self
1006                    .send_block_proposal(proposal, blob_ids, clock_skew_sender)
1007                    .await?;
1008                info.manager.pending.ok_or_else(|| {
1009                    NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
1010                })?
1011            }
1012            CommunicateAction::FinalizeBlock {
1013                certificate,
1014                delivery,
1015            } => {
1016                let info = self
1017                    .send_validated_certificate(*certificate, delivery)
1018                    .await?;
1019                info.manager.pending.ok_or_else(|| {
1020                    NodeError::MissingVoteInValidatorResponse("finalize a block".into())
1021                })?
1022            }
1023            CommunicateAction::RequestTimeout { round, height, .. } => {
1024                let info = self.request_timeout(chain_id, round, height).await?;
1025                info.manager.timeout_vote.ok_or_else(|| {
1026                    NodeError::MissingVoteInValidatorResponse("request a timeout".into())
1027                })?
1028            }
1029        };
1030        vote.check(self.remote_node.public_key)?;
1031        Ok(vote)
1032    }
1033}