Skip to main content

linera_core/
updater.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt,
8    hash::Hash,
9    mem,
10    sync::Arc,
11};
12
13use futures::{future, Future, StreamExt};
14use linera_base::{
15    crypto::ValidatorPublicKey,
16    data_types::{BlockHeight, Round, TimeDelta},
17    ensure,
18    identifiers::{BlobId, BlobType, ChainId, StreamId},
19    time::{timer::timeout, Duration, Instant},
20};
21use linera_chain::{
22    data_types::{BlockProposal, LiteVote},
23    manager::LockingBlock,
24    types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
25};
26use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME, BlobOrigin};
27use linera_storage::{Arc as CacheArc, Clock, Storage};
28use thiserror::Error;
29use tokio::sync::mpsc;
30use tracing::{instrument, Level};
31
32use crate::{
33    client::{chain_client, Client},
34    data_types::{ChainInfo, ChainInfoQuery},
35    environment::Environment,
36    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
37    remote_node::RemoteNode,
38    LocalNodeError,
39};
40
41/// The default amount of time we wait for additional validators to contribute
42/// to the result, as a fraction of how long it took to reach a quorum.
43pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
44
45/// A report of clock skew from a validator, sent before retrying due to `InvalidTimestamp`.
46pub type ClockSkewReport = (ValidatorPublicKey, TimeDelta);
47/// The maximum timeout for requests to a stake-weighted quorum if no quorum is reached.
48const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); // 1 day.
49
50/// Used for `communicate_chain_action`
51#[derive(Clone)]
52pub enum CommunicateAction {
53    SubmitBlock {
54        proposal: Box<BlockProposal>,
55        blob_ids: Vec<BlobId>,
56        /// Channel to report clock skew before sleeping, so the caller can aggregate reports.
57        clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
58    },
59    FinalizeBlock {
60        certificate: Box<ValidatedBlockCertificate>,
61        delivery: CrossChainMessageDelivery,
62    },
63    RequestTimeout {
64        chain_id: ChainId,
65        height: BlockHeight,
66        round: Round,
67    },
68}
69
70impl CommunicateAction {
71    /// The round to which this action pertains.
72    pub fn round(&self) -> Round {
73        match self {
74            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
75            CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
76            CommunicateAction::RequestTimeout { round, .. } => *round,
77        }
78    }
79}
80
81pub struct ValidatorUpdater<Env>
82where
83    Env: Environment,
84{
85    pub remote_node: RemoteNode<Env::ValidatorNode>,
86    pub client: Arc<Client<Env>>,
87    pub admin_chain_id: ChainId,
88}
89
90impl<Env: Environment> Clone for ValidatorUpdater<Env> {
91    fn clone(&self) -> Self {
92        ValidatorUpdater {
93            remote_node: self.remote_node.clone(),
94            client: self.client.clone(),
95            admin_chain_id: self.admin_chain_id,
96        }
97    }
98}
99
100/// An error result for requests to a stake-weighted quorum.
101#[derive(Error, Debug)]
102pub enum CommunicationError<E: fmt::Debug> {
103    /// No consensus is possible since validators returned different possibilities
104    /// for the next block
105    #[error(
106        "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
107    )]
108    NoConsensus(u64, Vec<(u64, usize)>),
109    /// A single error that was returned by a sufficient number of nodes to be trusted as
110    /// valid.
111    #[error("Failed to communicate with a quorum of validators: {0}")]
112    Trusted(E),
113    /// No single error reached the validity threshold so we're returning a sample of
114    /// errors for debugging purposes, together with their weight.
115    #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
116    Sample(Vec<(E, u64)>),
117}
118
119/// Executes a sequence of actions in parallel for all validators.
120///
121/// Tries to stop early when a quorum is reached. If `quorum_grace_period` is specified, other
122/// validators are given additional time to contribute to the result. The grace period is
123/// calculated as a fraction (defaulting to `DEFAULT_QUORUM_GRACE_PERIOD`) of the time taken to
124/// reach quorum.
125pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
126    validator_clients: &'a [RemoteNode<A>],
127    committee: &Committee,
128    group_by: G,
129    execute: F,
130    // Grace period as a fraction of time taken to reach quorum.
131    quorum_grace_period: f64,
132) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
133where
134    A: ValidatorNode + Clone + 'static,
135    F: Clone + Fn(RemoteNode<A>) -> R,
136    R: Future<Output = Result<V, chain_client::Error>> + 'a,
137    G: Fn(&V) -> K,
138    K: Hash + PartialEq + Eq + Clone + 'static,
139    V: 'static,
140{
141    let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
142        .iter()
143        .filter_map(|remote_node| {
144            if committee.weight(&remote_node.public_key) == 0 {
145                // This should not happen but better prevent it because certificates
146                // are not allowed to include votes with weight 0.
147                return None;
148            }
149            let execute = execute.clone();
150            let remote_node = remote_node.clone();
151            Some(async move { (remote_node.public_key, execute(remote_node).await) })
152        })
153        .collect();
154
155    let start_time = Instant::now();
156    let mut end_time: Option<Instant> = None;
157    let mut remaining_votes = committee.total_votes();
158    let mut highest_key_score = 0;
159    let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
160    let mut error_scores = HashMap::new();
161
162    'vote_wait: while let Ok(Some((name, result))) = timeout(
163        end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
164        responses.next(),
165    )
166    .await
167    {
168        remaining_votes -= committee.weight(&name);
169        match result {
170            Ok(value) => {
171                let key = group_by(&value);
172                let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
173                entry.0 += committee.weight(&name);
174                entry.1.push((name, value));
175                highest_key_score = highest_key_score.max(entry.0);
176            }
177            Err(err) => {
178                // TODO(#2857): Handle non-remote errors properly.
179                let err = match err {
180                    chain_client::Error::RemoteNodeError(err) => err,
181                    err => NodeError::ResponseHandlingError {
182                        error: err.to_string(),
183                    },
184                };
185                let entry = error_scores.entry(err.clone()).or_insert(0);
186                *entry += committee.weight(&name);
187            }
188        }
189        // If it becomes clear that no key can reach a quorum, break early.
190        if highest_key_score + remaining_votes < committee.quorum_threshold() {
191            break 'vote_wait;
192        }
193
194        // If a key reaches a quorum, wait for the grace period to collect more values
195        // or error information and then stop.
196        if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
197            end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
198        }
199    }
200
201    let scores = value_scores
202        .values()
203        .map(|(weight, values)| (*weight, values.len()))
204        .collect();
205    // If a key has a quorum, return it with its values.
206    if let Some((key, (_, values))) = value_scores
207        .into_iter()
208        .find(|(_, (score, _))| *score >= committee.quorum_threshold())
209    {
210        return Ok((key, values));
211    }
212
213    let mut sample = error_scores.into_iter().collect::<Vec<_>>();
214    sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
215    sample.truncate(4);
216    Err(match sample.as_slice() {
217        [] => CommunicationError::NoConsensus(committee.quorum_threshold(), scores),
218        [(_, score), ..] if *score >= committee.validity_threshold() => {
219            // At least one honest validator returned this error.
220            CommunicationError::Trusted(sample.into_iter().next().unwrap().0)
221        }
222        // Otherwise no specific error is available to report reliably.}
223        _ => CommunicationError::Sample(sample),
224    })
225}
226
227impl<Env> ValidatorUpdater<Env>
228where
229    Env: Environment + 'static,
230{
231    /// Logs a warning if the error is not an expected part of the protocol flow.
232    fn warn_if_unexpected(&self, err: &NodeError) {
233        if !err.is_expected() {
234            tracing::warn!(
235                remote_node = self.remote_node.address(),
236                %err,
237                "unexpected error from validator",
238            );
239        }
240    }
241
242    #[instrument(
243        level = "trace", skip_all, err(level = Level::DEBUG),
244        fields(chain_id = %certificate.block().header.chain_id)
245    )]
246    async fn send_confirmed_certificate(
247        &mut self,
248        certificate: &CacheArc<GenericCertificate<ConfirmedBlock>>,
249        delivery: CrossChainMessageDelivery,
250    ) -> Result<Box<ChainInfo>, chain_client::Error> {
251        let mut result = self
252            .remote_node
253            .handle_optimized_confirmed_certificate(certificate, delivery)
254            .await;
255
256        let mut sent_admin_chain = false;
257        let mut sent_blobs = false;
258        let mut sent_blocks = false;
259        loop {
260            match result {
261                Err(NodeError::EventsNotFound(event_ids))
262                    if !sent_admin_chain
263                        && certificate.inner().chain_id() != self.admin_chain_id
264                        && event_ids.iter().all(|event_id| {
265                            event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
266                                && event_id.chain_id == self.admin_chain_id
267                        }) =>
268                {
269                    // The validator doesn't have the committee that signed the certificate.
270                    self.update_admin_chain().await?;
271                    sent_admin_chain = true;
272                }
273                Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
274                    // The validator is missing the blobs required by the certificate.
275                    self.remote_node
276                        .check_blobs_not_found(certificate, &blob_ids)?;
277                    // The certificate is confirmed, so the blobs must be in storage.
278                    let maybe_blobs = self
279                        .client
280                        .local_node
281                        .read_blobs_from_storage(&blob_ids)
282                        .await?;
283                    let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
284                    self.remote_node
285                        .node
286                        .upload_blobs(blobs.into_iter().map(|b| b.into_std()).collect())
287                        .await?;
288                    sent_blobs = true;
289                }
290                Err(NodeError::BlocksNotFound(hashes)) if !sent_blocks => {
291                    // The validator has recorded these hashes as trusted by a
292                    // checkpoint cert it verified, but is missing the actual block
293                    // bytes. Upload each from local storage; the worker's
294                    // trust-mark accept path lets them through regardless of their
295                    // (possibly revoked) epoch.
296                    let storage = self.client.local_node.storage_client();
297                    let certificates = storage.read_certificates(&hashes).await?;
298                    for (hash, maybe_cert) in hashes.iter().zip(certificates) {
299                        let cert = maybe_cert.ok_or_else(|| {
300                            chain_client::Error::ReadCertificatesError(vec![*hash])
301                        })?;
302                        self.remote_node
303                            .handle_confirmed_certificate(cert, delivery)
304                            .await?;
305                    }
306                    sent_blocks = true;
307                }
308                result => {
309                    if let Err(err) = &result {
310                        self.warn_if_unexpected(err);
311                    }
312                    return Ok(result?);
313                }
314            }
315            result = self
316                .remote_node
317                .handle_confirmed_certificate(certificate.clone(), delivery)
318                .await;
319        }
320    }
321
322    async fn send_validated_certificate(
323        &mut self,
324        certificate: GenericCertificate<ValidatedBlock>,
325        delivery: CrossChainMessageDelivery,
326    ) -> Result<Box<ChainInfo>, chain_client::Error> {
327        let result = self
328            .remote_node
329            .handle_optimized_validated_certificate(&certificate, delivery)
330            .await;
331
332        let chain_id = certificate.inner().chain_id();
333        match &result {
334            Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
335                self.remote_node
336                    .check_blobs_not_found(&certificate, blob_ids)?;
337                // The certificate is for a validated block, i.e. for our locking block.
338                // Take the missing blobs from our local chain manager.
339                let blobs = self
340                    .client
341                    .local_node
342                    .get_locking_blobs(blob_ids, chain_id)
343                    .await?
344                    .ok_or_else(|| original_err.clone())?;
345                self.remote_node.send_pending_blobs(chain_id, blobs).await?;
346            }
347            Err(error) => {
348                self.sync_if_needed(
349                    chain_id,
350                    certificate.round,
351                    certificate.block().header.height,
352                    error,
353                )
354                .await?;
355            }
356            _ => return Ok(result?),
357        }
358        let result = self
359            .remote_node
360            .handle_validated_certificate(certificate)
361            .await;
362        if let Err(err) = &result {
363            self.warn_if_unexpected(err);
364        }
365        Ok(result?)
366    }
367
368    /// Requests a vote for a timeout certificate for the given round from the remote node.
369    ///
370    /// If the remote node is not in that round or at that height yet, sends the chain information
371    /// to update it.
372    async fn request_timeout(
373        &mut self,
374        chain_id: ChainId,
375        round: Round,
376        height: BlockHeight,
377    ) -> Result<Box<ChainInfo>, chain_client::Error> {
378        let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
379        let result = self
380            .remote_node
381            .handle_chain_info_query(query.clone())
382            .await;
383        if let Err(err) = &result {
384            self.sync_if_needed(chain_id, round, height, err).await?;
385            self.warn_if_unexpected(err);
386        }
387        Ok(result?)
388    }
389
390    /// Synchronizes either the local node or the remote node, if one of them is lagging behind.
391    async fn sync_if_needed(
392        &mut self,
393        chain_id: ChainId,
394        round: Round,
395        height: BlockHeight,
396        error: &NodeError,
397    ) -> Result<(), chain_client::Error> {
398        let address = &self.remote_node.address();
399        match error {
400            NodeError::WrongRound(validator_round) if *validator_round > round => {
401                tracing::debug!(
402                    address, %chain_id, %validator_round, %round,
403                    "validator is at a higher round; synchronizing",
404                );
405                self.client
406                    .synchronize_chain_state_from(&self.remote_node, chain_id)
407                    .await?;
408            }
409            NodeError::UnexpectedBlockHeight {
410                expected_block_height,
411                found_block_height,
412            } if expected_block_height > found_block_height => {
413                tracing::debug!(
414                    address,
415                    %chain_id,
416                    %expected_block_height,
417                    %found_block_height,
418                    "validator is at a higher height; synchronizing",
419                );
420                self.client
421                    .synchronize_chain_state_from(&self.remote_node, chain_id)
422                    .await?;
423            }
424            NodeError::WrongRound(validator_round) if *validator_round < round => {
425                tracing::debug!(
426                    address, %chain_id, %validator_round, %round,
427                    "validator is at a lower round; sending chain info",
428                );
429                self.send_chain_information(
430                    chain_id,
431                    height,
432                    CrossChainMessageDelivery::NonBlocking,
433                    None,
434                )
435                .await?;
436            }
437            NodeError::UnexpectedBlockHeight {
438                expected_block_height,
439                found_block_height,
440            } if expected_block_height < found_block_height => {
441                tracing::debug!(
442                    address,
443                    %chain_id,
444                    %expected_block_height,
445                    %found_block_height,
446                    "Validator is at a lower height; sending chain info.",
447                );
448                self.send_chain_information(
449                    chain_id,
450                    height,
451                    CrossChainMessageDelivery::NonBlocking,
452                    None,
453                )
454                .await?;
455            }
456            NodeError::InactiveChain(inactive_chain_id) => {
457                tracing::debug!(
458                    address,
459                    chain_id = %inactive_chain_id,
460                    "Validator has inactive chain; sending chain info.",
461                );
462                self.send_chain_information(
463                    *inactive_chain_id,
464                    height,
465                    CrossChainMessageDelivery::NonBlocking,
466                    None,
467                )
468                .await?;
469            }
470            _ => {}
471        }
472        Ok(())
473    }
474
475    async fn send_block_proposal(
476        &mut self,
477        proposal: Box<BlockProposal>,
478        mut blob_ids: Vec<BlobId>,
479        clock_skew_sender: mpsc::UnboundedSender<ClockSkewReport>,
480    ) -> Result<Box<ChainInfo>, chain_client::Error> {
481        let chain_id = proposal.content.block.chain_id;
482        let mut sent_cross_chain_updates = BTreeMap::new();
483        let mut publisher_chain_ids_sent = BTreeSet::new();
484        let storage = self.client.local_node.storage_client();
485        loop {
486            let local_time = storage.clock().current_time();
487            match self
488                .remote_node
489                .handle_block_proposal(proposal.clone())
490                .await
491            {
492                Ok(info) => return Ok(info),
493                Err(NodeError::WrongRound(_round)) => {
494                    // The proposal is for a different round, so we need to update the validator.
495                    // TODO: this should probably be more specific as to which rounds are retried.
496                    tracing::debug!(
497                        remote_node = self.remote_node.address(),
498                        %chain_id,
499                        "wrong round; sending chain to validator",
500                    );
501                    self.send_chain_information(
502                        chain_id,
503                        proposal.content.block.height,
504                        CrossChainMessageDelivery::NonBlocking,
505                        None,
506                    )
507                    .await?;
508                }
509                Err(NodeError::UnexpectedBlockHeight {
510                    expected_block_height,
511                    found_block_height,
512                }) if expected_block_height < found_block_height
513                    && found_block_height == proposal.content.block.height =>
514                {
515                    tracing::debug!(
516                        remote_node = self.remote_node.address(),
517                        %chain_id,
518                        "wrong height; sending chain to validator",
519                    );
520                    // The proposal is for a later block height, so we need to update the validator.
521                    self.send_chain_information(
522                        chain_id,
523                        found_block_height,
524                        CrossChainMessageDelivery::NonBlocking,
525                        None,
526                    )
527                    .await?;
528                }
529                Err(NodeError::MissingCrossChainUpdate {
530                    chain_id,
531                    origin,
532                    height,
533                }) if chain_id == proposal.content.block.chain_id
534                    && sent_cross_chain_updates
535                        .get(&origin)
536                        .is_none_or(|h| *h < height) =>
537                {
538                    tracing::debug!(
539                        remote_node = %self.remote_node.address(),
540                        chain_id = %origin,
541                        "Missing cross-chain update; sending chain to validator.",
542                    );
543                    sent_cross_chain_updates.insert(origin, height);
544                    // Some received certificates may be missing for this validator
545                    // (e.g. to create the chain or make the balance sufficient) so we are going to
546                    // synchronize them now and retry.
547                    self.send_chain_information(
548                        origin,
549                        height.try_add_one()?,
550                        CrossChainMessageDelivery::Blocking,
551                        None,
552                    )
553                    .await?;
554                }
555                Err(NodeError::EventsNotFound(event_ids)) => {
556                    let mut publisher_heights = BTreeMap::new();
557                    let chain_ids = event_ids
558                        .iter()
559                        .map(|event_id| event_id.chain_id)
560                        .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
561                        .collect::<BTreeSet<_>>();
562                    tracing::debug!(
563                        remote_node = self.remote_node.address(),
564                        ?chain_ids,
565                        "missing events; sending chains to validator",
566                    );
567                    ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
568                    for chain_id in chain_ids {
569                        let height = self
570                            .client
571                            .local_node
572                            .get_next_height_to_preprocess(chain_id)
573                            .await?;
574                        publisher_heights.insert(chain_id, height);
575                        publisher_chain_ids_sent.insert(chain_id);
576                    }
577                    self.send_chain_info_up_to_heights(
578                        publisher_heights,
579                        CrossChainMessageDelivery::NonBlocking,
580                    )
581                    .await?;
582                }
583                Err(error @ NodeError::ChainError { .. }) => {
584                    // The validator rejected the proposal because of its local chain
585                    // manager state — most commonly an incompatible confirmed vote tied
586                    // to a locking block we don't yet have. Pull manager values from
587                    // this validator so the local node absorbs whatever justified the
588                    // rejection (signatures are checked locally, so the source can't
589                    // fool us), then surface the error. If our local state actually
590                    // advanced, `execute_operations` will rebuild and re-propose; if
591                    // not, the error propagates as usual.
592                    self.warn_if_unexpected(&error);
593                    tracing::debug!(
594                        remote_node = self.remote_node.address(),
595                        %chain_id,
596                        %error,
597                        "validator rejected proposal; pulling manager state",
598                    );
599                    if let Err(sync_err) = self
600                        .client
601                        .synchronize_chain_state_from(&self.remote_node, chain_id)
602                        .await
603                    {
604                        tracing::debug!(%sync_err, "failed to pull manager state from validator");
605                    }
606                    return Err(error.into());
607                }
608                Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
609                    if !blob_ids.is_empty() =>
610                {
611                    tracing::debug!("Missing blobs");
612                    // For `BlobsNotFound`, we assume that the local node should already be
613                    // updated with the needed blobs, so sending the chain information about the
614                    // certificates that last used the blobs to the validator node should be enough.
615                    let published_blob_ids =
616                        BTreeSet::from_iter(proposal.content.block.published_blob_ids());
617                    blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
618                    let published_blobs = self
619                        .client
620                        .local_node
621                        .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
622                        .await?;
623                    self.remote_node
624                        .send_pending_blobs(chain_id, published_blobs)
625                        .await?;
626                    let missing_blob_ids = self
627                        .remote_node
628                        .node
629                        .missing_blob_ids(mem::take(&mut blob_ids))
630                        .await?;
631
632                    tracing::debug!("Sending chains for missing blobs");
633                    self.send_chain_info_for_blobs(
634                        &missing_blob_ids,
635                        CrossChainMessageDelivery::NonBlocking,
636                    )
637                    .await?;
638                }
639                Err(NodeError::InvalidTimestamp {
640                    block_timestamp,
641                    local_time: validator_local_time,
642                    ..
643                }) => {
644                    // The validator's clock is behind the block's timestamp. We need to
645                    // wait for two things:
646                    // 1. Our clock to reach block_timestamp (in case the block timestamp
647                    //    is in the future from our perspective too).
648                    // 2. The validator's clock to catch up (in case of clock skew between
649                    //    us and the validator).
650                    let clock_skew = local_time.delta_since(validator_local_time);
651                    tracing::debug!(
652                        remote_node = self.remote_node.address(),
653                        %chain_id,
654                        %block_timestamp,
655                        ?clock_skew,
656                        "validator's clock is behind; waiting and retrying",
657                    );
658                    // Report the clock skew before sleeping so the caller can aggregate.
659                    // Receiver may have been dropped if the caller is no longer interested.
660                    clock_skew_sender
661                        .send((self.remote_node.public_key, clock_skew))
662                        .ok();
663                    storage
664                        .clock()
665                        .sleep_until(block_timestamp.saturating_add(clock_skew))
666                        .await;
667                }
668                // Fail immediately on other errors.
669                Err(err) => {
670                    self.warn_if_unexpected(&err);
671                    return Err(err.into());
672                }
673            }
674        }
675    }
676
677    async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
678        let local_admin_info = self
679            .client
680            .local_node
681            .chain_info(self.admin_chain_id)
682            .await?;
683        Box::pin(self.send_chain_information(
684            self.admin_chain_id,
685            local_admin_info.next_block_height,
686            CrossChainMessageDelivery::NonBlocking,
687            None,
688        ))
689        .await
690    }
691
692    /// Sends chain information to bring a validator up to date with a specific chain.
693    ///
694    /// This method performs a two-phase synchronization:
695    /// 1. **Height synchronization**: Ensures the validator has all blocks up to `target_block_height`.
696    /// 2. **Round synchronization**: If heights match, ensures the validator has proposals/certificates
697    ///    for the current consensus round.
698    ///
699    /// # Height Sync Strategy
700    /// - For existing chains (target_block_height > 0):
701    ///   * Optimistically sends the last certificate first (often that's all that's missing).
702    ///   * Falls back to full chain query if the validator needs more context.
703    ///   * Sends any additional missing certificates in order.
704    /// - For new chains (target_block_height == 0):
705    ///   * Sends the chain description and dependencies first.
706    ///   * Then queries the validator's state.
707    ///
708    /// # Round Sync Strategy
709    /// Once heights match, if the local node is at a higher round, sends the evidence
710    /// (proposal, validated block, or timeout certificate) that proves the current round.
711    ///
712    /// # Parameters
713    /// - `chain_id`: The chain to synchronize
714    /// - `target_block_height`: The height the validator should reach
715    /// - `delivery`: Message delivery mode (blocking or non-blocking)
716    /// - `latest_certificate`: Optional certificate at target_block_height - 1 to avoid a storage lookup
717    ///
718    /// # Returns
719    /// - `Ok(())` if synchronization completed successfully or the validator is already up to date
720    /// - `Err` if there was a communication or storage error
721    #[instrument(level = "debug", skip_all, fields(%chain_id))]
722    pub async fn send_chain_information(
723        &mut self,
724        chain_id: ChainId,
725        target_block_height: BlockHeight,
726        delivery: CrossChainMessageDelivery,
727        latest_certificate: Option<CacheArc<GenericCertificate<ConfirmedBlock>>>,
728    ) -> Result<(), chain_client::Error> {
729        // Phase 1: Height synchronization
730        let info = if target_block_height.0 > 0 {
731            self.sync_chain_height(chain_id, target_block_height, delivery, latest_certificate)
732                .await?
733        } else {
734            self.initialize_new_chain_on_validator(chain_id).await?
735        };
736
737        // Phase 2: Round synchronization (if needed)
738        // Height synchronization is complete. Now check if we need to synchronize
739        // the consensus round at this height.
740        let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
741        let query = ChainInfoQuery::new(chain_id).with_manager_values();
742        let local_info = match self.client.local_node.handle_chain_info_query(query).await {
743            Ok(response) => response.info,
744            // If we don't have the full chain description locally, we can't help the
745            // validator with round synchronization. This is not an error - the validator
746            // should retry later once the chain is fully initialized locally.
747            Err(LocalNodeError::BlobsNotFound(_)) => {
748                tracing::debug!("local chain description not fully available, skipping round sync");
749                return Ok(());
750            }
751            Err(error) => return Err(error.into()),
752        };
753
754        let manager = local_info.manager;
755        if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
756            return Ok(());
757        }
758
759        // Validator is at our height but behind on consensus round
760        self.sync_consensus_round(remote_round, &manager).await
761    }
762
763    /// Synchronizes a validator to a specific block height by sending missing certificates.
764    ///
765    /// Uses an optimistic approach: sends the last certificate first, then fills in any gaps.
766    ///
767    /// Returns the [`ChainInfo`] from the validator after synchronization.
768    async fn sync_chain_height(
769        &mut self,
770        chain_id: ChainId,
771        target_block_height: BlockHeight,
772        delivery: CrossChainMessageDelivery,
773        latest_certificate: Option<CacheArc<GenericCertificate<ConfirmedBlock>>>,
774    ) -> Result<Box<ChainInfo>, chain_client::Error> {
775        let height = target_block_height.try_sub_one()?;
776
777        // Get the certificate for the last block we want to send
778        let certificate = if let Some(cert) = latest_certificate {
779            cert
780        } else {
781            self.read_certificates_for_heights(chain_id, vec![height])
782                .await?
783                .into_iter()
784                .next()
785                .ok_or_else(|| {
786                    chain_client::Error::InternalError(
787                        "failed to read latest certificate for height sync",
788                    )
789                })?
790        };
791
792        // Optimistically try sending just the last certificate
793        let info = match self
794            .send_confirmed_certificate(&certificate, delivery)
795            .await
796        {
797            Ok(info) => info,
798            Err(error) => {
799                tracing::debug!(
800                    address = self.remote_node.address(), %error,
801                    "validator failed to handle confirmed certificate; sending whole chain",
802                );
803                let query = ChainInfoQuery::new(chain_id);
804                self.remote_node.handle_chain_info_query(query).await?
805            }
806        };
807
808        // Push a checkpoint if we have one above the validator's tip, to skip past
809        // pre-checkpoint blocks. Mirrors `bootstrap_chain_from_checkpoint`.
810        let info = self
811            .push_checkpoint_if_useful(chain_id, info, delivery)
812            .await?;
813
814        // Calculate which block heights the validator is still missing
815        let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
816            .map(BlockHeight)
817            .collect();
818
819        if heights.is_empty() {
820            return Ok(info);
821        }
822
823        let batch_size = self.client.options().certificate_upload_batch_size;
824        for chunk in heights.chunks(batch_size) {
825            let certificates = self
826                .read_certificates_for_heights(chain_id, chunk.to_vec())
827                .await?;
828
829            for certificate in certificates {
830                self.send_confirmed_certificate(&certificate, delivery)
831                    .await?;
832            }
833        }
834
835        Ok(info)
836    }
837
838    /// Reads certificates for the given heights from storage.
839    async fn read_certificates_for_heights(
840        &self,
841        chain_id: ChainId,
842        heights: Vec<BlockHeight>,
843    ) -> Result<Vec<CacheArc<GenericCertificate<ConfirmedBlock>>>, chain_client::Error> {
844        let storage = self.client.local_node.storage_client();
845
846        let certificates_by_height = storage
847            .read_certificates_by_heights(chain_id, &heights)
848            .await?;
849
850        Ok(certificates_by_height.into_iter().flatten().collect())
851    }
852
853    /// If we hold a checkpoint at a height the validator hasn't reached yet, pushes
854    /// the checkpoint certificate so the validator can install our chain's execution
855    /// state without replaying every pre-checkpoint block. The worker's first attempt
856    /// will report any pre-checkpoint sender blocks it doesn't yet have via
857    /// `BlocksNotFound`, which `send_confirmed_certificate` then uploads before
858    /// retrying. Returns the validator's chain info after the push (unchanged if
859    /// there's nothing useful to push).
860    async fn push_checkpoint_if_useful(
861        &mut self,
862        chain_id: ChainId,
863        info: Box<ChainInfo>,
864        delivery: CrossChainMessageDelivery,
865    ) -> Result<Box<ChainInfo>, chain_client::Error> {
866        let local_query = ChainInfoQuery::new(chain_id).with_latest_checkpoint_height();
867        let local_info = self
868            .client
869            .local_node
870            .handle_chain_info_query(local_query)
871            .await?
872            .info;
873        let Some(checkpoint_height) = local_info.requested_latest_checkpoint_height else {
874            return Ok(info);
875        };
876        if checkpoint_height < info.next_block_height {
877            return Ok(info);
878        }
879        let Some(checkpoint_cert) = self
880            .read_certificates_for_heights(chain_id, vec![checkpoint_height])
881            .await?
882            .into_iter()
883            .next()
884        else {
885            return Ok(info);
886        };
887        self.send_confirmed_certificate(&checkpoint_cert, delivery)
888            .await
889    }
890
891    /// Initializes a new chain on the validator by sending the chain description and dependencies.
892    ///
893    /// This is called when the validator doesn't know about the chain yet.
894    ///
895    /// Returns the [`ChainInfo`] from the validator after initialization.
896    async fn initialize_new_chain_on_validator(
897        &self,
898        chain_id: ChainId,
899    ) -> Result<Box<ChainInfo>, chain_client::Error> {
900        // Send chain description and all dependency chains
901        self.send_chain_info_for_blobs(
902            &[BlobId::new(chain_id.0, BlobType::ChainDescription)],
903            CrossChainMessageDelivery::NonBlocking,
904        )
905        .await?;
906
907        // Query the validator's state for this chain
908        let query = ChainInfoQuery::new(chain_id);
909        let info = self.remote_node.handle_chain_info_query(query).await?;
910        Ok(info)
911    }
912
913    /// Synchronizes the consensus round state with the validator.
914    ///
915    /// If the validator is at the same height but an earlier round, sends the evidence
916    /// (proposal, validated block, or timeout certificate) that justifies the current round.
917    ///
918    /// This is a best-effort operation - failures are logged but don't fail the entire sync.
919    async fn sync_consensus_round(
920        &self,
921        remote_round: Round,
922        manager: &linera_chain::manager::ChainManagerInfo,
923    ) -> Result<(), chain_client::Error> {
924        let target_round = manager.current_round;
925
926        // First, push the locking certificate if it justifies our current round. A
927        // locking block from an earlier round is not enough on its own to advance the
928        // remote: the remote may still be ahead via a timeout or signed proposal, and
929        // pushing a stale lock would not move them. Push only the current-round lock.
930        if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.as_deref() {
931            if validated.round == target_round {
932                match self
933                    .remote_node
934                    .handle_optimized_validated_certificate(
935                        validated,
936                        CrossChainMessageDelivery::NonBlocking,
937                    )
938                    .await
939                {
940                    Ok(info) => {
941                        tracing::debug!("successfully sent validated block for round sync");
942                        if info.manager.current_round >= target_round {
943                            return Ok(());
944                        }
945                    }
946                    Err(error) => {
947                        tracing::debug!(%error, "failed to send validated block");
948                    }
949                }
950            }
951        }
952
953        // Try to send a timeout certificate. The remote applies `next_round(cert.round)`
954        // to its current round, which (for the cert we hold) lands at our current round.
955        if let Some(cert) = &manager.timeout {
956            if cert.round >= remote_round {
957                match self
958                    .remote_node
959                    .handle_timeout_certificate(cert.as_ref().clone())
960                    .await
961                {
962                    Ok(info) => {
963                        tracing::debug!(round = %cert.round, "successfully sent timeout certificate");
964                        if info.manager.current_round >= target_round {
965                            return Ok(());
966                        }
967                    }
968                    Err(error) => {
969                        tracing::debug!(%error, round = %cert.round, "failed to send timeout certificate");
970                    }
971                }
972            }
973        }
974
975        // Finally, try to push a proposal at the current round.
976        for proposal in manager
977            .requested_proposed
978            .iter()
979            .chain(manager.requested_signed_proposal.iter())
980        {
981            if proposal.content.round == target_round {
982                match self
983                    .remote_node
984                    .handle_block_proposal(proposal.clone())
985                    .await
986                {
987                    Ok(info) => {
988                        tracing::debug!("successfully sent block proposal for round sync");
989                        if info.manager.current_round >= target_round {
990                            return Ok(());
991                        }
992                    }
993                    Err(error) => {
994                        tracing::debug!(%error, "failed to send block proposal");
995                    }
996                }
997            }
998        }
999
1000        // If we reach here, either we had no round sync data to send, or all attempts failed.
1001        // This is not a fatal error - height sync succeeded which is the primary goal.
1002        tracing::debug!("round sync not performed: no applicable data or all attempts failed");
1003        Ok(())
1004    }
1005
1006    /// Sends chain information for all chains referenced by the given blobs.
1007    ///
1008    /// Reads blob states from storage, determines the specific chain heights needed,
1009    /// and sends chain information for those heights. With sparse chains, this only
1010    /// sends the specific blocks containing the blobs, not all blocks up to those heights.
1011    async fn send_chain_info_for_blobs(
1012        &self,
1013        blob_ids: &[BlobId],
1014        delivery: CrossChainMessageDelivery,
1015    ) -> Result<(), chain_client::Error> {
1016        let blob_states = self
1017            .client
1018            .local_node
1019            .read_blob_states_from_storage(blob_ids)
1020            .await?;
1021
1022        let mut chain_heights: BTreeMap<ChainId, BTreeSet<BlockHeight>> = BTreeMap::new();
1023        for blob_state in blob_states {
1024            match blob_state.origin {
1025                // Genesis blobs aren't published by any block; the recipient has
1026                // them from its own genesis config. Nothing to ship.
1027                BlobOrigin::Genesis => continue,
1028                BlobOrigin::Published {
1029                    chain_id,
1030                    block_height,
1031                } => {
1032                    chain_heights
1033                        .entry(chain_id)
1034                        .or_default()
1035                        .insert(block_height);
1036                }
1037            }
1038        }
1039
1040        self.send_chain_info_at_heights(chain_heights, delivery)
1041            .await
1042    }
1043
1044    /// Sends chain information for specific heights on multiple chains.
1045    ///
1046    /// Unlike `send_chain_info_up_to_heights`, this method only sends the blocks at the
1047    /// specified heights, not all blocks up to those heights. This is more efficient for
1048    /// sparse chains where only specific blocks are needed.
1049    async fn send_chain_info_at_heights(
1050        &self,
1051        chain_heights: impl IntoIterator<Item = (ChainId, BTreeSet<BlockHeight>)>,
1052        delivery: CrossChainMessageDelivery,
1053    ) -> Result<(), chain_client::Error> {
1054        future::try_join_all(chain_heights.into_iter().map(|(chain_id, heights)| {
1055            let mut updater = self.clone();
1056            async move {
1057                // Get all block hashes for this chain at the specified heights in one call
1058                let heights_vec = heights.into_iter().collect::<Vec<_>>();
1059                let certificates = updater
1060                    .client
1061                    .local_node
1062                    .storage_client()
1063                    .read_certificates_by_heights(chain_id, &heights_vec)
1064                    .await?
1065                    .into_iter()
1066                    .flatten()
1067                    .collect::<Vec<_>>();
1068
1069                // Send each certificate
1070                for certificate in certificates {
1071                    updater
1072                        .send_confirmed_certificate(&certificate, delivery)
1073                        .await?;
1074                }
1075
1076                Ok::<_, chain_client::Error>(())
1077            }
1078        }))
1079        .await?;
1080        Ok(())
1081    }
1082
1083    async fn send_chain_info_up_to_heights(
1084        &self,
1085        chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
1086        delivery: CrossChainMessageDelivery,
1087    ) -> Result<(), chain_client::Error> {
1088        future::try_join_all(chain_heights.into_iter().map(|(chain_id, height)| {
1089            let mut updater = self.clone();
1090            async move {
1091                updater
1092                    .send_chain_information(chain_id, height, delivery, None)
1093                    .await
1094            }
1095        }))
1096        .await?;
1097        Ok(())
1098    }
1099
1100    pub async fn send_chain_update(
1101        &mut self,
1102        action: CommunicateAction,
1103    ) -> Result<LiteVote, chain_client::Error> {
1104        let chain_id = match &action {
1105            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
1106            CommunicateAction::FinalizeBlock { certificate, .. } => {
1107                certificate.inner().block().header.chain_id
1108            }
1109            CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
1110        };
1111        // Send the block proposal, certificate or timeout request and return a vote.
1112        let vote = match action {
1113            CommunicateAction::SubmitBlock {
1114                proposal,
1115                blob_ids,
1116                clock_skew_sender,
1117            } => {
1118                let info = self
1119                    .send_block_proposal(proposal, blob_ids, clock_skew_sender)
1120                    .await?;
1121                info.manager.pending.ok_or_else(|| {
1122                    NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
1123                })?
1124            }
1125            CommunicateAction::FinalizeBlock {
1126                certificate,
1127                delivery,
1128            } => {
1129                let info = self
1130                    .send_validated_certificate(*certificate, delivery)
1131                    .await?;
1132                info.manager.pending.ok_or_else(|| {
1133                    NodeError::MissingVoteInValidatorResponse("finalize a block".into())
1134                })?
1135            }
1136            CommunicateAction::RequestTimeout { round, height, .. } => {
1137                let info = self.request_timeout(chain_id, round, height).await?;
1138                info.manager.timeout_vote.ok_or_else(|| {
1139                    NodeError::MissingVoteInValidatorResponse("request a timeout".into())
1140                })?
1141            }
1142        };
1143        vote.check(self.remote_node.public_key)?;
1144        Ok(vote)
1145    }
1146}