linera_core/
updater.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt,
8    hash::Hash,
9    mem,
10    sync::Arc,
11};
12
13use futures::{
14    stream::{FuturesUnordered, TryStreamExt},
15    Future, StreamExt,
16};
17use linera_base::{
18    crypto::ValidatorPublicKey,
19    data_types::{BlockHeight, Round, TimeDelta},
20    ensure,
21    identifiers::{BlobId, BlobType, ChainId, StreamId},
22    time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25    data_types::{BlockProposal, LiteVote},
26    manager::LockingBlock,
27    types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{Clock, Storage};
31use thiserror::Error;
32use tokio::sync::mpsc;
33use tracing::{instrument, Level};
34
35use crate::{
36    client::{chain_client, Client},
37    data_types::{ChainInfo, ChainInfoQuery},
38    environment::Environment,
39    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
40    remote_node::RemoteNode,
41    LocalNodeError,
42};
43
44/// The default amount of time we wait for additional validators to contribute
45/// to the result, as a fraction of how long it took to reach a quorum.
46pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
47
48/// A report of clock skew from a validator, sent before retrying due to `InvalidTimestamp`.
49pub type ClockSkewReport = (ValidatorPublicKey, TimeDelta);
50/// The maximum timeout for requests to a stake-weighted quorum if no quorum is reached.
51const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); // 1 day.
52
53/// Used for `communicate_chain_action`
54#[derive(Clone)]
55pub enum CommunicateAction {
56    SubmitBlock {
57        proposal: Box<BlockProposal>,
58        blob_ids: Vec<BlobId>,
59        /// Channel to report clock skew before sleeping, so the caller can aggregate reports.
60        clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
61    },
62    FinalizeBlock {
63        certificate: Box<ValidatedBlockCertificate>,
64        delivery: CrossChainMessageDelivery,
65    },
66    RequestTimeout {
67        chain_id: ChainId,
68        height: BlockHeight,
69        round: Round,
70    },
71}
72
73impl CommunicateAction {
74    /// The round to which this action pertains.
75    pub fn round(&self) -> Round {
76        match self {
77            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
78            CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
79            CommunicateAction::RequestTimeout { round, .. } => *round,
80        }
81    }
82}
83
84pub struct ValidatorUpdater<Env>
85where
86    Env: Environment,
87{
88    pub remote_node: RemoteNode<Env::ValidatorNode>,
89    pub client: Arc<Client<Env>>,
90    pub admin_chain_id: ChainId,
91}
92
93impl<Env: Environment> Clone for ValidatorUpdater<Env> {
94    fn clone(&self) -> Self {
95        ValidatorUpdater {
96            remote_node: self.remote_node.clone(),
97            client: self.client.clone(),
98            admin_chain_id: self.admin_chain_id,
99        }
100    }
101}
102
103/// An error result for requests to a stake-weighted quorum.
104#[derive(Error, Debug)]
105pub enum CommunicationError<E: fmt::Debug> {
106    /// No consensus is possible since validators returned different possibilities
107    /// for the next block
108    #[error(
109        "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
110    )]
111    NoConsensus(u64, Vec<(u64, usize)>),
112    /// A single error that was returned by a sufficient number of nodes to be trusted as
113    /// valid.
114    #[error("Failed to communicate with a quorum of validators: {0}")]
115    Trusted(E),
116    /// No single error reached the validity threshold so we're returning a sample of
117    /// errors for debugging purposes, together with their weight.
118    #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
119    Sample(Vec<(E, u64)>),
120}
121
122/// Executes a sequence of actions in parallel for all validators.
123///
124/// Tries to stop early when a quorum is reached. If `quorum_grace_period` is specified, other
125/// validators are given additional time to contribute to the result. The grace period is
126/// calculated as a fraction (defaulting to `DEFAULT_QUORUM_GRACE_PERIOD`) of the time taken to
127/// reach quorum.
128pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
129    validator_clients: &'a [RemoteNode<A>],
130    committee: &Committee,
131    group_by: G,
132    execute: F,
133    // Grace period as a fraction of time taken to reach quorum.
134    quorum_grace_period: f64,
135) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
136where
137    A: ValidatorNode + Clone + 'static,
138    F: Clone + Fn(RemoteNode<A>) -> R,
139    R: Future<Output = Result<V, chain_client::Error>> + 'a,
140    G: Fn(&V) -> K,
141    K: Hash + PartialEq + Eq + Clone + 'static,
142    V: 'static,
143{
144    let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
145        .iter()
146        .filter_map(|remote_node| {
147            if committee.weight(&remote_node.public_key) == 0 {
148                // This should not happen but better prevent it because certificates
149                // are not allowed to include votes with weight 0.
150                return None;
151            }
152            let execute = execute.clone();
153            let remote_node = remote_node.clone();
154            Some(async move { (remote_node.public_key, execute(remote_node).await) })
155        })
156        .collect();
157
158    let start_time = Instant::now();
159    let mut end_time: Option<Instant> = None;
160    let mut remaining_votes = committee.total_votes();
161    let mut highest_key_score = 0;
162    let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
163    let mut error_scores = HashMap::new();
164
165    'vote_wait: while let Ok(Some((name, result))) = timeout(
166        end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
167        responses.next(),
168    )
169    .await
170    {
171        remaining_votes -= committee.weight(&name);
172        match result {
173            Ok(value) => {
174                let key = group_by(&value);
175                let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
176                entry.0 += committee.weight(&name);
177                entry.1.push((name, value));
178                highest_key_score = highest_key_score.max(entry.0);
179            }
180            Err(err) => {
181                // TODO(#2857): Handle non-remote errors properly.
182                let err = match err {
183                    chain_client::Error::RemoteNodeError(err) => err,
184                    err => NodeError::ResponseHandlingError {
185                        error: err.to_string(),
186                    },
187                };
188                let entry = error_scores.entry(err.clone()).or_insert(0);
189                *entry += committee.weight(&name);
190            }
191        }
192        // If it becomes clear that no key can reach a quorum, break early.
193        if highest_key_score + remaining_votes < committee.quorum_threshold() {
194            break 'vote_wait;
195        }
196
197        // If a key reaches a quorum, wait for the grace period to collect more values
198        // or error information and then stop.
199        if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
200            end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
201        }
202    }
203
204    let scores = value_scores
205        .values()
206        .map(|(weight, values)| (*weight, values.len()))
207        .collect();
208    // If a key has a quorum, return it with its values.
209    if let Some((key, (_, values))) = value_scores
210        .into_iter()
211        .find(|(_, (score, _))| *score >= committee.quorum_threshold())
212    {
213        return Ok((key, values));
214    }
215
216    let mut sample = error_scores.into_iter().collect::<Vec<_>>();
217    sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
218    sample.truncate(4);
219    Err(match sample.as_slice() {
220        [] => CommunicationError::NoConsensus(committee.quorum_threshold(), scores),
221        [(_, score), ..] if *score >= committee.validity_threshold() => {
222            // At least one honest validator returned this error.
223            CommunicationError::Trusted(sample.into_iter().next().unwrap().0)
224        }
225        // Otherwise no specific error is available to report reliably.}
226        _ => CommunicationError::Sample(sample),
227    })
228}
229
230impl<Env> ValidatorUpdater<Env>
231where
232    Env: Environment + 'static,
233{
234    /// Logs a warning if the error is not an expected part of the protocol flow.
235    fn warn_if_unexpected(&self, err: &NodeError) {
236        if !err.is_expected() {
237            tracing::warn!(
238                remote_node = self.remote_node.address(),
239                %err,
240                "unexpected error from validator",
241            );
242        }
243    }
244
245    #[instrument(
246        level = "trace", skip_all, err(level = Level::DEBUG),
247        fields(chain_id = %certificate.block().header.chain_id)
248    )]
249    async fn send_confirmed_certificate(
250        &mut self,
251        certificate: &Arc<GenericCertificate<ConfirmedBlock>>,
252        delivery: CrossChainMessageDelivery,
253    ) -> Result<Box<ChainInfo>, chain_client::Error> {
254        let mut result = self
255            .remote_node
256            .handle_optimized_confirmed_certificate(certificate, delivery)
257            .await;
258
259        let mut sent_admin_chain = false;
260        let mut sent_blobs = false;
261        loop {
262            match result {
263                Err(NodeError::EventsNotFound(event_ids))
264                    if !sent_admin_chain
265                        && certificate.inner().chain_id() != self.admin_chain_id
266                        && event_ids.iter().all(|event_id| {
267                            event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
268                                && event_id.chain_id == self.admin_chain_id
269                        }) =>
270                {
271                    // The validator doesn't have the committee that signed the certificate.
272                    self.update_admin_chain().await?;
273                    sent_admin_chain = true;
274                }
275                Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
276                    // The validator is missing the blobs required by the certificate.
277                    self.remote_node
278                        .check_blobs_not_found(certificate, &blob_ids)?;
279                    // The certificate is confirmed, so the blobs must be in storage.
280                    let maybe_blobs = self
281                        .client
282                        .local_node
283                        .read_blobs_from_storage(&blob_ids)
284                        .await?;
285                    let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
286                    self.remote_node.node.upload_blobs(blobs).await?;
287                    sent_blobs = true;
288                }
289                result => {
290                    if let Err(err) = &result {
291                        self.warn_if_unexpected(err);
292                    }
293                    return Ok(result?);
294                }
295            }
296            result = self
297                .remote_node
298                .handle_confirmed_certificate(certificate.clone(), delivery)
299                .await;
300        }
301    }
302
303    async fn send_validated_certificate(
304        &mut self,
305        certificate: GenericCertificate<ValidatedBlock>,
306        delivery: CrossChainMessageDelivery,
307    ) -> Result<Box<ChainInfo>, chain_client::Error> {
308        let result = self
309            .remote_node
310            .handle_optimized_validated_certificate(&certificate, delivery)
311            .await;
312
313        let chain_id = certificate.inner().chain_id();
314        match &result {
315            Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
316                self.remote_node
317                    .check_blobs_not_found(&certificate, blob_ids)?;
318                // The certificate is for a validated block, i.e. for our locking block.
319                // Take the missing blobs from our local chain manager.
320                let blobs = self
321                    .client
322                    .local_node
323                    .get_locking_blobs(blob_ids, chain_id)
324                    .await?
325                    .ok_or_else(|| original_err.clone())?;
326                self.remote_node.send_pending_blobs(chain_id, blobs).await?;
327            }
328            Err(error) => {
329                self.sync_if_needed(
330                    chain_id,
331                    certificate.round,
332                    certificate.block().header.height,
333                    error,
334                )
335                .await?;
336            }
337            _ => return Ok(result?),
338        }
339        let result = self
340            .remote_node
341            .handle_validated_certificate(certificate)
342            .await;
343        if let Err(err) = &result {
344            self.warn_if_unexpected(err);
345        }
346        Ok(result?)
347    }
348
349    /// Requests a vote for a timeout certificate for the given round from the remote node.
350    ///
351    /// If the remote node is not in that round or at that height yet, sends the chain information
352    /// to update it.
353    async fn request_timeout(
354        &mut self,
355        chain_id: ChainId,
356        round: Round,
357        height: BlockHeight,
358    ) -> Result<Box<ChainInfo>, chain_client::Error> {
359        let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
360        let result = self
361            .remote_node
362            .handle_chain_info_query(query.clone())
363            .await;
364        if let Err(err) = &result {
365            self.sync_if_needed(chain_id, round, height, err).await?;
366            self.warn_if_unexpected(err);
367        }
368        Ok(result?)
369    }
370
371    /// Synchronizes either the local node or the remote node, if one of them is lagging behind.
372    async fn sync_if_needed(
373        &mut self,
374        chain_id: ChainId,
375        round: Round,
376        height: BlockHeight,
377        error: &NodeError,
378    ) -> Result<(), chain_client::Error> {
379        let address = &self.remote_node.address();
380        match error {
381            NodeError::WrongRound(validator_round) if *validator_round > round => {
382                tracing::debug!(
383                    address, %chain_id, %validator_round, %round,
384                    "validator is at a higher round; synchronizing",
385                );
386                self.client
387                    .synchronize_chain_state_from(&self.remote_node, chain_id)
388                    .await?;
389            }
390            NodeError::UnexpectedBlockHeight {
391                expected_block_height,
392                found_block_height,
393            } if expected_block_height > found_block_height => {
394                tracing::debug!(
395                    address,
396                    %chain_id,
397                    %expected_block_height,
398                    %found_block_height,
399                    "validator is at a higher height; synchronizing",
400                );
401                self.client
402                    .synchronize_chain_state_from(&self.remote_node, chain_id)
403                    .await?;
404            }
405            NodeError::WrongRound(validator_round) if *validator_round < round => {
406                tracing::debug!(
407                    address, %chain_id, %validator_round, %round,
408                    "validator is at a lower round; sending chain info",
409                );
410                self.send_chain_information(
411                    chain_id,
412                    height,
413                    CrossChainMessageDelivery::NonBlocking,
414                    None,
415                )
416                .await?;
417            }
418            NodeError::UnexpectedBlockHeight {
419                expected_block_height,
420                found_block_height,
421            } if expected_block_height < found_block_height => {
422                tracing::debug!(
423                    address,
424                    %chain_id,
425                    %expected_block_height,
426                    %found_block_height,
427                    "Validator is at a lower height; sending chain info.",
428                );
429                self.send_chain_information(
430                    chain_id,
431                    height,
432                    CrossChainMessageDelivery::NonBlocking,
433                    None,
434                )
435                .await?;
436            }
437            NodeError::InactiveChain(inactive_chain_id) => {
438                tracing::debug!(
439                    address,
440                    chain_id = %inactive_chain_id,
441                    "Validator has inactive chain; sending chain info.",
442                );
443                self.send_chain_information(
444                    *inactive_chain_id,
445                    height,
446                    CrossChainMessageDelivery::NonBlocking,
447                    None,
448                )
449                .await?;
450            }
451            _ => {}
452        }
453        Ok(())
454    }
455
456    async fn send_block_proposal(
457        &mut self,
458        proposal: Box<BlockProposal>,
459        mut blob_ids: Vec<BlobId>,
460        clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
461    ) -> Result<Box<ChainInfo>, chain_client::Error> {
462        let chain_id = proposal.content.block.chain_id;
463        let mut sent_cross_chain_updates = BTreeMap::new();
464        let mut publisher_chain_ids_sent = BTreeSet::new();
465        let storage = self.client.local_node.storage_client();
466        loop {
467            let local_time = storage.clock().current_time();
468            match self
469                .remote_node
470                .handle_block_proposal(proposal.clone())
471                .await
472            {
473                Ok(info) => return Ok(info),
474                Err(NodeError::WrongRound(_round)) => {
475                    // The proposal is for a different round, so we need to update the validator.
476                    // TODO: this should probably be more specific as to which rounds are retried.
477                    tracing::debug!(
478                        remote_node = self.remote_node.address(),
479                        %chain_id,
480                        "wrong round; sending chain to validator",
481                    );
482                    self.send_chain_information(
483                        chain_id,
484                        proposal.content.block.height,
485                        CrossChainMessageDelivery::NonBlocking,
486                        None,
487                    )
488                    .await?;
489                }
490                Err(NodeError::UnexpectedBlockHeight {
491                    expected_block_height,
492                    found_block_height,
493                }) if expected_block_height < found_block_height
494                    && found_block_height == proposal.content.block.height =>
495                {
496                    tracing::debug!(
497                        remote_node = self.remote_node.address(),
498                        %chain_id,
499                        "wrong height; sending chain to validator",
500                    );
501                    // The proposal is for a later block height, so we need to update the validator.
502                    self.send_chain_information(
503                        chain_id,
504                        found_block_height,
505                        CrossChainMessageDelivery::NonBlocking,
506                        None,
507                    )
508                    .await?;
509                }
510                Err(NodeError::MissingCrossChainUpdate {
511                    chain_id,
512                    origin,
513                    height,
514                }) if chain_id == proposal.content.block.chain_id
515                    && sent_cross_chain_updates
516                        .get(&origin)
517                        .is_none_or(|h| *h < height) =>
518                {
519                    tracing::debug!(
520                        remote_node = %self.remote_node.address(),
521                        chain_id = %origin,
522                        "Missing cross-chain update; sending chain to validator.",
523                    );
524                    sent_cross_chain_updates.insert(origin, height);
525                    // Some received certificates may be missing for this validator
526                    // (e.g. to create the chain or make the balance sufficient) so we are going to
527                    // synchronize them now and retry.
528                    self.send_chain_information(
529                        origin,
530                        height.try_add_one()?,
531                        CrossChainMessageDelivery::Blocking,
532                        None,
533                    )
534                    .await?;
535                }
536                Err(NodeError::EventsNotFound(event_ids)) => {
537                    let mut publisher_heights = BTreeMap::new();
538                    let chain_ids = event_ids
539                        .iter()
540                        .map(|event_id| event_id.chain_id)
541                        .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
542                        .collect::<BTreeSet<_>>();
543                    tracing::debug!(
544                        remote_node = self.remote_node.address(),
545                        ?chain_ids,
546                        "missing events; sending chains to validator",
547                    );
548                    ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
549                    for chain_id in chain_ids {
550                        let height = self
551                            .client
552                            .local_node
553                            .get_next_height_to_preprocess(chain_id)
554                            .await?;
555                        publisher_heights.insert(chain_id, height);
556                        publisher_chain_ids_sent.insert(chain_id);
557                    }
558                    self.send_chain_info_up_to_heights(
559                        publisher_heights,
560                        CrossChainMessageDelivery::NonBlocking,
561                    )
562                    .await?;
563                }
564                Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
565                    if !blob_ids.is_empty() =>
566                {
567                    tracing::debug!("Missing blobs");
568                    // For `BlobsNotFound`, we assume that the local node should already be
569                    // updated with the needed blobs, so sending the chain information about the
570                    // certificates that last used the blobs to the validator node should be enough.
571                    let published_blob_ids =
572                        BTreeSet::from_iter(proposal.content.block.published_blob_ids());
573                    blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
574                    let published_blobs = self
575                        .client
576                        .local_node
577                        .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
578                        .await?;
579                    self.remote_node
580                        .send_pending_blobs(chain_id, published_blobs)
581                        .await?;
582                    let missing_blob_ids = self
583                        .remote_node
584                        .node
585                        .missing_blob_ids(mem::take(&mut blob_ids))
586                        .await?;
587
588                    tracing::debug!("Sending chains for missing blobs");
589                    self.send_chain_info_for_blobs(
590                        &missing_blob_ids,
591                        CrossChainMessageDelivery::NonBlocking,
592                    )
593                    .await?;
594                }
595                Err(NodeError::InvalidTimestamp {
596                    block_timestamp,
597                    local_time: validator_local_time,
598                    ..
599                }) => {
600                    // The validator's clock is behind the block's timestamp. We need to
601                    // wait for two things:
602                    // 1. Our clock to reach block_timestamp (in case the block timestamp
603                    //    is in the future from our perspective too).
604                    // 2. The validator's clock to catch up (in case of clock skew between
605                    //    us and the validator).
606                    let clock_skew = local_time.delta_since(validator_local_time);
607                    tracing::debug!(
608                        remote_node = self.remote_node.address(),
609                        %chain_id,
610                        %block_timestamp,
611                        ?clock_skew,
612                        "validator's clock is behind; waiting and retrying",
613                    );
614                    // Report the clock skew before sleeping so the caller can aggregate.
615                    // Receiver may have been dropped if the caller is no longer interested.
616                    clock_skew_sender
617                        .send((self.remote_node.public_key, clock_skew))
618                        .ok();
619                    storage
620                        .clock()
621                        .sleep_until(block_timestamp.saturating_add(clock_skew))
622                        .await;
623                }
624                // Fail immediately on other errors.
625                Err(err) => {
626                    self.warn_if_unexpected(&err);
627                    return Err(err.into());
628                }
629            }
630        }
631    }
632
633    async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
634        let local_admin_info = self
635            .client
636            .local_node
637            .chain_info(self.admin_chain_id)
638            .await?;
639        Box::pin(self.send_chain_information(
640            self.admin_chain_id,
641            local_admin_info.next_block_height,
642            CrossChainMessageDelivery::NonBlocking,
643            None,
644        ))
645        .await
646    }
647
648    /// Sends chain information to bring a validator up to date with a specific chain.
649    ///
650    /// This method performs a two-phase synchronization:
651    /// 1. **Height synchronization**: Ensures the validator has all blocks up to `target_block_height`.
652    /// 2. **Round synchronization**: If heights match, ensures the validator has proposals/certificates
653    ///    for the current consensus round.
654    ///
655    /// # Height Sync Strategy
656    /// - For existing chains (target_block_height > 0):
657    ///   * Optimistically sends the last certificate first (often that's all that's missing).
658    ///   * Falls back to full chain query if the validator needs more context.
659    ///   * Sends any additional missing certificates in order.
660    /// - For new chains (target_block_height == 0):
661    ///   * Sends the chain description and dependencies first.
662    ///   * Then queries the validator's state.
663    ///
664    /// # Round Sync Strategy
665    /// Once heights match, if the local node is at a higher round, sends the evidence
666    /// (proposal, validated block, or timeout certificate) that proves the current round.
667    ///
668    /// # Parameters
669    /// - `chain_id`: The chain to synchronize
670    /// - `target_block_height`: The height the validator should reach
671    /// - `delivery`: Message delivery mode (blocking or non-blocking)
672    /// - `latest_certificate`: Optional certificate at target_block_height - 1 to avoid a storage lookup
673    ///
674    /// # Returns
675    /// - `Ok(())` if synchronization completed successfully or the validator is already up to date
676    /// - `Err` if there was a communication or storage error
677    #[instrument(level = "debug", skip_all, fields(%chain_id))]
678    pub async fn send_chain_information(
679        &mut self,
680        chain_id: ChainId,
681        target_block_height: BlockHeight,
682        delivery: CrossChainMessageDelivery,
683        latest_certificate: Option<Arc<GenericCertificate<ConfirmedBlock>>>,
684    ) -> Result<(), chain_client::Error> {
685        // Phase 1: Height synchronization
686        let info = if target_block_height.0 > 0 {
687            self.sync_chain_height(chain_id, target_block_height, delivery, latest_certificate)
688                .await?
689        } else {
690            self.initialize_new_chain_on_validator(chain_id).await?
691        };
692
693        // Phase 2: Round synchronization (if needed)
694        // Height synchronization is complete. Now check if we need to synchronize
695        // the consensus round at this height.
696        let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
697        let query = ChainInfoQuery::new(chain_id).with_manager_values();
698        let local_info = match self.client.local_node.handle_chain_info_query(query).await {
699            Ok(response) => response.info,
700            // If we don't have the full chain description locally, we can't help the
701            // validator with round synchronization. This is not an error - the validator
702            // should retry later once the chain is fully initialized locally.
703            Err(LocalNodeError::BlobsNotFound(_)) => {
704                tracing::debug!("local chain description not fully available, skipping round sync");
705                return Ok(());
706            }
707            Err(error) => return Err(error.into()),
708        };
709
710        let manager = local_info.manager;
711        if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
712            return Ok(());
713        }
714
715        // Validator is at our height but behind on consensus round
716        self.sync_consensus_round(remote_round, &manager).await
717    }
718
719    /// Synchronizes a validator to a specific block height by sending missing certificates.
720    ///
721    /// Uses an optimistic approach: sends the last certificate first, then fills in any gaps.
722    ///
723    /// Returns the [`ChainInfo`] from the validator after synchronization.
724    async fn sync_chain_height(
725        &mut self,
726        chain_id: ChainId,
727        target_block_height: BlockHeight,
728        delivery: CrossChainMessageDelivery,
729        latest_certificate: Option<Arc<GenericCertificate<ConfirmedBlock>>>,
730    ) -> Result<Box<ChainInfo>, chain_client::Error> {
731        let height = target_block_height.try_sub_one()?;
732
733        // Get the certificate for the last block we want to send
734        let certificate = if let Some(cert) = latest_certificate {
735            cert
736        } else {
737            self.read_certificates_for_heights(chain_id, vec![height])
738                .await?
739                .into_iter()
740                .next()
741                .ok_or_else(|| {
742                    chain_client::Error::InternalError(
743                        "failed to read latest certificate for height sync",
744                    )
745                })?
746        };
747
748        // Optimistically try sending just the last certificate
749        let info = match self
750            .send_confirmed_certificate(&certificate, delivery)
751            .await
752        {
753            Ok(info) => info,
754            Err(error) => {
755                tracing::debug!(
756                    address = self.remote_node.address(), %error,
757                    "validator failed to handle confirmed certificate; sending whole chain",
758                );
759                let query = ChainInfoQuery::new(chain_id);
760                self.remote_node.handle_chain_info_query(query).await?
761            }
762        };
763
764        // Calculate which block heights the validator is still missing
765        let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
766            .map(BlockHeight)
767            .collect();
768
769        if heights.is_empty() {
770            return Ok(info);
771        }
772
773        let batch_size = self.client.options().certificate_upload_batch_size;
774        for chunk in heights.chunks(batch_size) {
775            let certificates = self
776                .read_certificates_for_heights(chain_id, chunk.to_vec())
777                .await?;
778
779            for certificate in certificates {
780                self.send_confirmed_certificate(&certificate, delivery)
781                    .await?;
782            }
783        }
784
785        Ok(info)
786    }
787
788    /// Reads certificates for the given heights from storage.
789    async fn read_certificates_for_heights(
790        &self,
791        chain_id: ChainId,
792        heights: Vec<BlockHeight>,
793    ) -> Result<Vec<Arc<GenericCertificate<ConfirmedBlock>>>, chain_client::Error> {
794        let storage = self.client.local_node.storage_client();
795
796        let certificates_by_height = storage
797            .read_certificates_by_heights(chain_id, &heights)
798            .await?;
799
800        Ok(certificates_by_height.into_iter().flatten().collect())
801    }
802
803    /// Initializes a new chain on the validator by sending the chain description and dependencies.
804    ///
805    /// This is called when the validator doesn't know about the chain yet.
806    ///
807    /// Returns the [`ChainInfo`] from the validator after initialization.
808    async fn initialize_new_chain_on_validator(
809        &self,
810        chain_id: ChainId,
811    ) -> Result<Box<ChainInfo>, chain_client::Error> {
812        // Send chain description and all dependency chains
813        self.send_chain_info_for_blobs(
814            &[BlobId::new(chain_id.0, BlobType::ChainDescription)],
815            CrossChainMessageDelivery::NonBlocking,
816        )
817        .await?;
818
819        // Query the validator's state for this chain
820        let query = ChainInfoQuery::new(chain_id);
821        let info = self.remote_node.handle_chain_info_query(query).await?;
822        Ok(info)
823    }
824
825    /// Synchronizes the consensus round state with the validator.
826    ///
827    /// If the validator is at the same height but an earlier round, sends the evidence
828    /// (proposal, validated block, or timeout certificate) that justifies the current round.
829    ///
830    /// This is a best-effort operation - failures are logged but don't fail the entire sync.
831    async fn sync_consensus_round(
832        &self,
833        remote_round: Round,
834        manager: &linera_chain::manager::ChainManagerInfo,
835    ) -> Result<(), chain_client::Error> {
836        // Try to send a proposal for the current round
837        for proposal in manager
838            .requested_proposed
839            .iter()
840            .chain(manager.requested_signed_proposal.iter())
841        {
842            if proposal.content.round == manager.current_round {
843                match self
844                    .remote_node
845                    .handle_block_proposal(proposal.clone())
846                    .await
847                {
848                    Ok(_) => {
849                        tracing::debug!("successfully sent block proposal for round sync");
850                        return Ok(());
851                    }
852                    Err(error) => {
853                        tracing::debug!(%error, "failed to send block proposal");
854                    }
855                }
856            }
857        }
858
859        // Try to send a validated block for the current round
860        if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.as_deref() {
861            if validated.round == manager.current_round {
862                match self
863                    .remote_node
864                    .handle_optimized_validated_certificate(
865                        validated,
866                        CrossChainMessageDelivery::NonBlocking,
867                    )
868                    .await
869                {
870                    Ok(_) => {
871                        tracing::debug!("successfully sent validated block for round sync");
872                        return Ok(());
873                    }
874                    Err(error) => {
875                        tracing::debug!(%error, "failed to send validated block");
876                    }
877                }
878            }
879        }
880
881        // Try to send a timeout certificate
882        if let Some(cert) = &manager.timeout {
883            if cert.round >= remote_round {
884                match self
885                    .remote_node
886                    .handle_timeout_certificate(cert.as_ref().clone())
887                    .await
888                {
889                    Ok(_) => {
890                        tracing::debug!(round = %cert.round, "successfully sent timeout certificate");
891                        return Ok(());
892                    }
893                    Err(error) => {
894                        tracing::debug!(%error, round = %cert.round, "failed to send timeout certificate");
895                    }
896                }
897            }
898        }
899
900        // If we reach here, either we had no round sync data to send, or all attempts failed.
901        // This is not a fatal error - height sync succeeded which is the primary goal.
902        tracing::debug!("round sync not performed: no applicable data or all attempts failed");
903        Ok(())
904    }
905
906    /// Sends chain information for all chains referenced by the given blobs.
907    ///
908    /// Reads blob states from storage, determines the specific chain heights needed,
909    /// and sends chain information for those heights. With sparse chains, this only
910    /// sends the specific blocks containing the blobs, not all blocks up to those heights.
911    async fn send_chain_info_for_blobs(
912        &self,
913        blob_ids: &[BlobId],
914        delivery: CrossChainMessageDelivery,
915    ) -> Result<(), chain_client::Error> {
916        let blob_states = self
917            .client
918            .local_node
919            .read_blob_states_from_storage(blob_ids)
920            .await?;
921
922        let mut chain_heights: BTreeMap<ChainId, BTreeSet<BlockHeight>> = BTreeMap::new();
923        for blob_state in blob_states {
924            let block_chain_id = blob_state.chain_id;
925            let block_height = blob_state.block_height;
926            chain_heights
927                .entry(block_chain_id)
928                .or_default()
929                .insert(block_height);
930        }
931
932        self.send_chain_info_at_heights(chain_heights, delivery)
933            .await
934    }
935
936    /// Sends chain information for specific heights on multiple chains.
937    ///
938    /// Unlike `send_chain_info_up_to_heights`, this method only sends the blocks at the
939    /// specified heights, not all blocks up to those heights. This is more efficient for
940    /// sparse chains where only specific blocks are needed.
941    async fn send_chain_info_at_heights(
942        &self,
943        chain_heights: impl IntoIterator<Item = (ChainId, BTreeSet<BlockHeight>)>,
944        delivery: CrossChainMessageDelivery,
945    ) -> Result<(), chain_client::Error> {
946        chain_heights
947            .into_iter()
948            .map(|(chain_id, heights)| {
949                let mut updater = self.clone();
950                async move {
951                    // Get all block hashes for this chain at the specified heights in one call
952                    let heights_vec: Vec<_> = heights.into_iter().collect();
953                    let certificates = updater
954                        .client
955                        .local_node
956                        .storage_client()
957                        .read_certificates_by_heights(chain_id, &heights_vec)
958                        .await?
959                        .into_iter()
960                        .flatten()
961                        .collect::<Vec<_>>();
962
963                    // Send each certificate
964                    for certificate in certificates {
965                        updater
966                            .send_confirmed_certificate(&certificate, delivery)
967                            .await?;
968                    }
969
970                    Ok::<_, chain_client::Error>(())
971                }
972            })
973            .collect::<FuturesUnordered<_>>()
974            .try_collect::<Vec<_>>()
975            .await?;
976        Ok(())
977    }
978
979    async fn send_chain_info_up_to_heights(
980        &self,
981        chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
982        delivery: CrossChainMessageDelivery,
983    ) -> Result<(), chain_client::Error> {
984        chain_heights
985            .into_iter()
986            .map(|(chain_id, height)| {
987                let mut updater = self.clone();
988                async move {
989                    updater
990                        .send_chain_information(chain_id, height, delivery, None)
991                        .await
992                }
993            })
994            .collect::<FuturesUnordered<_>>()
995            .try_collect::<Vec<_>>()
996            .await?;
997        Ok(())
998    }
999
1000    pub async fn send_chain_update(
1001        &mut self,
1002        action: CommunicateAction,
1003    ) -> Result<LiteVote, chain_client::Error> {
1004        let chain_id = match &action {
1005            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
1006            CommunicateAction::FinalizeBlock { certificate, .. } => {
1007                certificate.inner().block().header.chain_id
1008            }
1009            CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
1010        };
1011        // Send the block proposal, certificate or timeout request and return a vote.
1012        let vote = match action {
1013            CommunicateAction::SubmitBlock {
1014                proposal,
1015                blob_ids,
1016                clock_skew_sender,
1017            } => {
1018                let info = self
1019                    .send_block_proposal(proposal, blob_ids, clock_skew_sender)
1020                    .await?;
1021                info.manager.pending.ok_or_else(|| {
1022                    NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
1023                })?
1024            }
1025            CommunicateAction::FinalizeBlock {
1026                certificate,
1027                delivery,
1028            } => {
1029                let info = self
1030                    .send_validated_certificate(*certificate, delivery)
1031                    .await?;
1032                info.manager.pending.ok_or_else(|| {
1033                    NodeError::MissingVoteInValidatorResponse("finalize a block".into())
1034                })?
1035            }
1036            CommunicateAction::RequestTimeout { round, height, .. } => {
1037                let info = self.request_timeout(chain_id, round, height).await?;
1038                info.manager.timeout_vote.ok_or_else(|| {
1039                    NodeError::MissingVoteInValidatorResponse("request a timeout".into())
1040                })?
1041            }
1042        };
1043        vote.check(self.remote_node.public_key)?;
1044        Ok(vote)
1045    }
1046}