Skip to main content

linera_core/client/
mod.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::Ordering,
7    collections::{BTreeMap, BTreeSet, HashSet},
8    slice,
9    sync::{Arc, RwLock},
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14    future::{Future, TryFutureExt as _},
15    stream::{self, AbortHandle, FuturesOrdered, FuturesUnordered, StreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20    crypto::{CryptoHash, Signer as _, ValidatorPublicKey},
21    data_types::{
22        ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch, Round, TimeDelta, Timestamp,
23    },
24    ensure,
25    hashed::Hashed,
26    identifiers::{AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
27    time::Duration,
28};
29#[cfg(not(target_arch = "wasm32"))]
30use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
31use linera_chain::{
32    data_types::{
33        BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, ChainAndHeight, LiteVote,
34        ProposedBlock,
35    },
36    manager::LockingBlock,
37    types::{
38        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
39        LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
40    },
41    ChainError, ChainIdSet,
42};
43use linera_execution::{committee::Committee, ExecutionError};
44use linera_storage::{Arc as CacheArc, Clock as _, ResultReadCertificates, Storage as _};
45use rand::seq::SliceRandom;
46use received_log::ReceivedLogs;
47use serde::{Deserialize, Serialize};
48use tokio::sync::mpsc;
49use tracing::{debug, error, info, instrument, trace, warn};
50
51use crate::{
52    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
53    environment::Environment,
54    local_node::{LocalNodeClient, LocalNodeError},
55    node::{CrossChainMessageDelivery, NodeError, ValidatorNode as _, ValidatorNodeProvider as _},
56    notifier::{ChannelNotifier, Notifier as _},
57    remote_node::RemoteNode,
58    updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
59    worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
60    ChainWorkerConfig, ProcessConfirmedBlockMode, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
61};
62
63pub mod chain_client;
64pub use chain_client::ChainClient;
65
66pub use crate::data_types::ClientOutcome;
67
68#[cfg(test)]
69#[path = "../unit_tests/client_tests.rs"]
70mod client_tests;
71pub mod requests_scheduler;
72
73pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
74mod received_log;
75mod validator_trackers;
76
77#[cfg(with_metrics)]
78mod metrics {
79    use std::sync::LazyLock;
80
81    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
82    use prometheus::HistogramVec;
83
84    pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
85        LazyLock::new(|| {
86            register_histogram_vec(
87                "process_inbox_latency",
88                "process_inbox latency",
89                &[],
90                exponential_bucket_latencies(10_000.0),
91            )
92        });
93
94    pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
95        register_histogram_vec(
96            "prepare_chain_latency",
97            "prepare_chain latency",
98            &[],
99            exponential_bucket_latencies(10_000.0),
100        )
101    });
102
103    pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
104        register_histogram_vec(
105            "synchronize_chain_state_latency",
106            "synchronize_chain_state latency",
107            &[],
108            exponential_bucket_latencies(10_000.0),
109        )
110    });
111
112    pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
113        register_histogram_vec(
114            "execute_block_latency",
115            "execute_block latency",
116            &[],
117            exponential_bucket_latencies(10_000.0),
118        )
119    });
120
121    pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
122        register_histogram_vec(
123            "find_received_certificates_latency",
124            "find_received_certificates latency",
125            &[],
126            exponential_bucket_latencies(10_000.0),
127        )
128    });
129}
130
131pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
132pub static DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE: usize = 500;
133pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
134pub static DEFAULT_MAX_EVENT_STREAM_QUERIES: usize = 1000;
135pub static DEFAULT_MAX_CONCURRENT_BATCH_DOWNLOADS: usize = 1;
136
137#[derive(Debug, Clone, Copy)]
138pub enum TimingType {
139    ExecuteOperations,
140    ExecuteBlock,
141    SubmitBlockProposal,
142    UpdateValidators,
143}
144
145/// Defines what type of notifications we should process for a chain:
146/// - do we fully participate in consensus and download sender chains?
147/// - or do we only follow the chain's blocks without participating?
148/// - or do we only care about blocks containing events from some particular streams?
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub enum ListeningMode {
151    /// Listen to everything: all blocks for the chain and all blocks from sender chains,
152    /// and participate in rounds.
153    FullChain,
154    /// Listen to all blocks for the chain, but don't download sender chain blocks or participate
155    /// in rounds. Use this when interested in the chain's state but not intending to propose
156    /// blocks (e.g., because we're not a chain owner).
157    FollowChain,
158    /// Only listen to blocks which contain events from those streams.
159    EventsOnly(BTreeSet<StreamId>),
160}
161
162impl PartialOrd for ListeningMode {
163    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
164        match (self, other) {
165            (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
166            (ListeningMode::FullChain, _) => Some(Ordering::Greater),
167            (_, ListeningMode::FullChain) => Some(Ordering::Less),
168            (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
169            (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
170            (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
171            (ListeningMode::EventsOnly(a), ListeningMode::EventsOnly(b)) => {
172                if a == b {
173                    Some(Ordering::Equal)
174                } else if a.is_superset(b) {
175                    Some(Ordering::Greater)
176                } else if b.is_superset(a) {
177                    Some(Ordering::Less)
178                } else {
179                    None
180                }
181            }
182        }
183    }
184}
185
186impl ListeningMode {
187    /// Returns whether a notification with this reason should be processed under this listening
188    /// mode.
189    pub fn is_relevant(&self, reason: &Reason) -> bool {
190        match (reason, self) {
191            // NewEvents is only processed in EventsOnly mode, the other modes depend on
192            // the NewBlock notification.
193            (Reason::NewEvents { .. }, ListeningMode::FollowChain | ListeningMode::FullChain) => {
194                false
195            }
196            // FullChain processes everything.
197            (_, ListeningMode::FullChain) => true,
198            // FollowChain processes new blocks on the chain itself, including blocks that
199            // produced events.
200            (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
201            (_, ListeningMode::FollowChain) => false,
202            // EventsOnly only processes events from relevant streams.
203            (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
204                relevant.intersection(event_streams).next().is_some()
205            }
206            (_, ListeningMode::EventsOnly(_)) => false,
207        }
208    }
209
210    pub fn extend(&mut self, other: Option<ListeningMode>) {
211        match (self, other) {
212            (_, None) => (),
213            (ListeningMode::FullChain, _) => (),
214            (mode, Some(ListeningMode::FullChain)) => {
215                *mode = ListeningMode::FullChain;
216            }
217            (ListeningMode::FollowChain, _) => (),
218            (mode, Some(ListeningMode::FollowChain)) => {
219                *mode = ListeningMode::FollowChain;
220            }
221            (
222                ListeningMode::EventsOnly(self_events),
223                Some(ListeningMode::EventsOnly(other_events)),
224            ) => {
225                self_events.extend(other_events);
226            }
227        }
228    }
229
230    /// Returns whether this mode implies follow-only behavior (i.e., not participating in
231    /// consensus rounds).
232    pub fn is_follow_only(&self) -> bool {
233        !matches!(self, ListeningMode::FullChain)
234    }
235
236    /// Returns whether this is a full chain mode (synchronizing sender chains and updating
237    /// inboxes).
238    pub fn is_full(&self) -> bool {
239        matches!(self, ListeningMode::FullChain)
240    }
241
242    pub fn should_sync_chain_state(&self) -> bool {
243        match self {
244            ListeningMode::FullChain | ListeningMode::FollowChain => true,
245            ListeningMode::EventsOnly(_) => false,
246        }
247    }
248}
249
250/// The per-chain [`ListeningMode`]s tracked by a local node, together with a memoized
251/// [`Hashed`] of the fully-tracked subset.
252///
253/// The hash is the version of the outbox index (see
254/// [`ChainStateView::outbox_index_tracked_hash`]). Caching it here — recomputed only when a chain
255/// newly becomes `FullChain`, which is rare and client-side — lets every cross-chain operation
256/// compare and reconcile the index in `O(1)` instead of rehashing the tracked set each time.
257///
258/// [`ChainStateView::outbox_index_tracked_hash`]: linera_chain::ChainStateView
259#[derive(Debug)]
260pub struct ChainModes {
261    modes: BTreeMap<ChainId, ListeningMode>,
262    full: Arc<Hashed<ChainIdSet>>,
263}
264
265impl Default for ChainModes {
266    fn default() -> Self {
267        Self::new(BTreeMap::new())
268    }
269}
270
271impl ChainModes {
272    /// Builds the listening modes from `modes`, computing the tracked-set hash once.
273    pub fn new(modes: BTreeMap<ChainId, ListeningMode>) -> Self {
274        let full = Self::compute_full(&modes);
275        Self { modes, full }
276    }
277
278    fn compute_full(modes: &BTreeMap<ChainId, ListeningMode>) -> Arc<Hashed<ChainIdSet>> {
279        Arc::new(Hashed::new(ChainIdSet(
280            modes
281                .iter()
282                .filter(|(_, mode)| mode.is_full())
283                .map(|(id, _)| *id)
284                .collect(),
285        )))
286    }
287
288    /// The fully-tracked chains together with their memoized hash (`O(1)` clone).
289    pub fn full(&self) -> Arc<Hashed<ChainIdSet>> {
290        self.full.clone()
291    }
292
293    /// Returns the listening mode for `chain_id`, if it is tracked.
294    pub fn get(&self, chain_id: &ChainId) -> Option<&ListeningMode> {
295        self.modes.get(chain_id)
296    }
297
298    /// Merges `mode` into the entry for `chain_id` — monotonic in the listening-mode order, so it
299    /// never weakens an existing entry — and returns the resulting mode. The tracked-set hash is
300    /// recomputed only if the chain newly became `FullChain`.
301    pub fn extend_mode(&mut self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
302        let entry = self
303            .modes
304            .entry(chain_id)
305            .or_insert_with(|| ListeningMode::EventsOnly(BTreeSet::new()));
306        let was_full = entry.is_full();
307        entry.extend(Some(mode));
308        let result = entry.clone();
309        if !was_full && result.is_full() {
310            self.full = Self::compute_full(&self.modes);
311        }
312        result
313    }
314}
315
316/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
317pub struct Client<Env: Environment> {
318    environment: Env,
319    /// Local node to manage the execution state and the local storage of the chains that we are
320    /// tracking.
321    pub local_node: LocalNodeClient<Env::Storage>,
322    /// Manages the requests sent to validator nodes.
323    requests_scheduler: Arc<RequestsScheduler<Env>>,
324    /// The admin chain ID.
325    admin_chain_id: ChainId,
326    /// Chains that should be tracked by the client, along with their listening mode.
327    /// The presence of a chain in this map means it is tracked by the local node.
328    chain_modes: Arc<RwLock<ChainModes>>,
329    /// References to clients waiting for chain notifications.
330    notifier: Arc<ChannelNotifier<Notification>>,
331    /// Chain state for the managed chains.
332    chains: papaya::HashMap<ChainId, chain_client::State>,
333    /// Configuration options.
334    options: chain_client::Options,
335}
336
337impl<Env: Environment> Client<Env> {
338    /// Creates a new `Client` with a new cache and notifiers.
339    #[instrument(level = "trace", skip_all)]
340    #[expect(clippy::too_many_arguments)]
341    pub fn new(
342        environment: Env,
343        admin_chain_id: ChainId,
344        long_lived_services: bool,
345        chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
346        name: impl Into<String>,
347        chain_worker_ttl: Option<Duration>,
348        sender_chain_worker_ttl: Option<Duration>,
349        cross_chain_batch_size_limit: usize,
350        options: chain_client::Options,
351        block_cache_size: usize,
352        execution_state_cache_size: usize,
353        requests_scheduler_config: &requests_scheduler::RequestsSchedulerConfig,
354    ) -> Self {
355        let mut modes = chain_modes.into_iter().collect::<BTreeMap<_, _>>();
356        // The client needs the admin chain fully synced for epoch tracking, so
357        // promote it (or insert) into `FullChain`. `extend` is monotonic in the
358        // listening mode order, so this never weakens an existing entry.
359        modes
360            .entry(admin_chain_id)
361            .or_insert(ListeningMode::FullChain)
362            .extend(Some(ListeningMode::FullChain));
363        let chain_modes = Arc::new(RwLock::new(ChainModes::new(modes)));
364        let config = ChainWorkerConfig {
365            nickname: name.into(),
366            long_lived_services,
367            allow_inactive_chains: true,
368            ttl: chain_worker_ttl,
369            sender_chain_ttl: sender_chain_worker_ttl,
370            block_cache_size,
371            execution_state_cache_size,
372            cross_chain_batch_size_limit,
373            ..ChainWorkerConfig::default()
374        };
375        let state = WorkerState::new(
376            environment.storage().clone(),
377            config,
378            Some(chain_modes.clone()),
379        );
380        let local_node = LocalNodeClient::new(state);
381        let requests_scheduler =
382            Arc::new(RequestsScheduler::new(vec![], requests_scheduler_config));
383
384        Self {
385            environment,
386            local_node,
387            requests_scheduler,
388            chains: papaya::HashMap::new(),
389            admin_chain_id,
390            chain_modes,
391            notifier: Arc::new(ChannelNotifier::default()),
392            options,
393        }
394    }
395
396    /// Returns the chain ID of the admin chain.
397    pub fn admin_chain_id(&self) -> ChainId {
398        self.admin_chain_id
399    }
400
401    /// Subscribes to notifications for the given chain IDs.
402    pub fn subscribe(
403        &self,
404        chain_ids: Vec<ChainId>,
405    ) -> tokio::sync::mpsc::UnboundedReceiver<Notification> {
406        self.notifier.subscribe(chain_ids)
407    }
408
409    /// Adds additional chain IDs to an existing subscription.
410    pub fn subscribe_extra(
411        &self,
412        chain_ids: Vec<ChainId>,
413        sender: &tokio::sync::mpsc::UnboundedSender<Notification>,
414    ) {
415        self.notifier.add_sender(chain_ids, sender);
416    }
417
418    /// Returns the storage client used by this client's local node.
419    pub fn storage_client(&self) -> &Env::Storage {
420        self.environment.storage()
421    }
422
423    /// Tries to read a certificate from local storage, using the hash if available
424    /// (fast path) or falling back to a height-based lookup.
425    async fn try_read_local_certificate(
426        &self,
427        chain_id: ChainId,
428        height: BlockHeight,
429        hash: Option<CryptoHash>,
430    ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, chain_client::Error> {
431        if let Some(hash) = hash {
432            return Ok(self.storage_client().read_certificate(hash).await?);
433        }
434        let results = self
435            .storage_client()
436            .read_certificates_by_heights(chain_id, &[height])
437            .await?;
438        Ok(results.into_iter().next().flatten())
439    }
440
441    pub fn validator_node_provider(&self) -> &Env::Network {
442        self.environment.network()
443    }
444
445    pub(crate) fn options(&self) -> &chain_client::Options {
446        &self.options
447    }
448
449    /// Handles any pending local cross-chain requests, notifying subscribers.
450    pub async fn retry_pending_cross_chain_requests(
451        &self,
452        sender_chain: ChainId,
453    ) -> Result<(), LocalNodeError> {
454        self.local_node
455            .retry_pending_cross_chain_requests(sender_chain, &self.notifier)
456            .await
457    }
458
459    /// Returns a reference to the client's [`Signer`][crate::environment::Signer].
460    #[instrument(level = "trace", skip(self))]
461    pub fn signer(&self) -> &Env::Signer {
462        self.environment.signer()
463    }
464
465    /// Returns whether the signer has a key for the given owner.
466    pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, chain_client::Error> {
467        self.signer()
468            .contains_key(owner)
469            .await
470            .map_err(chain_client::Error::signer_failure)
471    }
472
473    /// Returns a reference to the client's [`Wallet`][crate::environment::Wallet].
474    pub fn wallet(&self) -> &Env::Wallet {
475        self.environment.wallet()
476    }
477
478    /// Extends the listening mode for a chain, combining with the existing mode if present.
479    /// Returns the resulting mode.
480    #[instrument(level = "trace", skip(self))]
481    pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
482        self.chain_modes
483            .write()
484            .expect("Panics should not happen while holding a lock to `chain_modes`")
485            .extend_mode(chain_id, mode)
486    }
487
488    /// Returns the listening mode for a chain, if it is tracked.
489    pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
490        self.chain_modes
491            .read()
492            .expect("Panics should not happen while holding a lock to `chain_modes`")
493            .get(&chain_id)
494            .cloned()
495    }
496
497    /// Returns whether a chain is fully tracked by the local node.
498    pub fn is_tracked(&self, chain_id: ChainId) -> bool {
499        self.chain_modes
500            .read()
501            .expect("Panics should not happen while holding a lock to `chain_modes`")
502            .get(&chain_id)
503            .is_some_and(ListeningMode::is_full)
504    }
505
506    /// Creates a new `ChainClient`.
507    #[expect(clippy::too_many_arguments)]
508    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
509    pub fn create_chain_client(
510        self: &Arc<Self>,
511        chain_id: ChainId,
512        block_hash: Option<CryptoHash>,
513        next_block_height: BlockHeight,
514        pending_proposal: &Option<PendingProposal>,
515        preferred_owner: Option<AccountOwner>,
516        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
517        follow_only: bool,
518    ) -> ChainClient<Env> {
519        // If the entry already exists we assume that the entry is more up to date than
520        // the arguments: If they were read from the wallet file, they might be stale.
521        self.chains.pin().get_or_insert_with(chain_id, || {
522            chain_client::State::new(pending_proposal.clone(), follow_only)
523        });
524
525        ChainClient::new(
526            self.clone(),
527            chain_id,
528            self.options.clone(),
529            block_hash,
530            next_block_height,
531            preferred_owner,
532            timing_sender,
533        )
534    }
535
536    /// Returns whether the given chain is in follow-only mode.
537    fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
538        self.chains
539            .pin()
540            .get(&chain_id)
541            .is_some_and(|state| state.is_follow_only())
542    }
543
544    /// Sets whether the given chain is in follow-only mode.
545    pub fn set_chain_follow_only(&self, chain_id: ChainId, follow_only: bool) {
546        self.chains
547            .pin()
548            .update(chain_id, |state| state.with_follow_only(follow_only));
549    }
550
551    /// Fetches the chain description blob if needed, and returns the chain info.
552    async fn fetch_chain_info(
553        &self,
554        chain_id: ChainId,
555        validators: &[RemoteNode<Env::ValidatorNode>],
556    ) -> Result<Box<ChainInfo>, chain_client::Error> {
557        match self.local_node.chain_info(chain_id).await {
558            Ok(info) => Ok(info),
559            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
560                // Make sure the admin chain is up to date so we can validate the
561                // certificate that creates this chain's description blob.
562                self.synchronize_chain_state(self.admin_chain_id).await?;
563                self.update_local_node_with_blobs_from(blob_ids, validators)
564                    .await?;
565                Ok(self.local_node.chain_info(chain_id).await?)
566            }
567            Err(err) => Err(err.into()),
568        }
569    }
570
571    /// Downloads and processes all certificates up to (excluding) the specified height.
572    #[instrument(level = "trace", skip(self))]
573    async fn download_certificates(
574        &self,
575        chain_id: ChainId,
576        target_next_block_height: BlockHeight,
577    ) -> Result<Box<ChainInfo>, chain_client::Error> {
578        let validators = self.validator_nodes().await?;
579        let mut info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
580        if target_next_block_height <= info.next_block_height {
581            return Ok(info);
582        }
583        info = self
584            .load_local_certificates(chain_id, target_next_block_height, None)
585            .await?;
586        let mut next_height = info.next_block_height;
587        // Download remaining batches using all validators with staggered fallback.
588        while next_height < target_next_block_height {
589            let limit = u64::from(target_next_block_height)
590                .checked_sub(u64::from(next_height))
591                .ok_or(ArithmeticError::Overflow)?
592                .min(self.options.certificate_download_batch_size);
593            let certificates = self
594                .requests_scheduler
595                .download_certificates_from_validators(
596                    &validators,
597                    chain_id,
598                    next_height,
599                    limit,
600                    self.options.certificate_batch_download_timeout,
601                )
602                .await?;
603            let Some(new_info) = self
604                .process_certificates(
605                    &validators,
606                    certificates,
607                    None,
608                    ProcessConfirmedBlockMode::Execute,
609                )
610                .await?
611            else {
612                break;
613            };
614            assert!(new_info.next_block_height > next_height);
615            next_height = new_info.next_block_height;
616            info = new_info;
617        }
618        ensure!(
619            target_next_block_height <= info.next_block_height,
620            chain_client::Error::CannotDownloadCertificates {
621                chain_id,
622                target_next_block_height,
623            }
624        );
625        Ok(info)
626    }
627
628    /// Loads and processes certificates from local storage for the given chain, from the
629    /// current local height up to `end`. Returns the chain info after processing.
630    /// If `until_block_time` is `Some`, stops before processing any certificate whose
631    /// block timestamp is >= the given value (exclusive).
632    async fn load_local_certificates(
633        &self,
634        chain_id: ChainId,
635        end: BlockHeight,
636        until_block_time: Option<Timestamp>,
637    ) -> Result<Box<ChainInfo>, chain_client::Error> {
638        let mut last_info = self.local_node.chain_info(chain_id).await?;
639        let next_height = last_info.next_block_height;
640        let hashes = self
641            .local_node
642            .get_preprocessed_block_hashes(chain_id, next_height, end)
643            .await?;
644        let certificates = self.storage_client().read_certificates(&hashes).await?;
645        let certificates = match ResultReadCertificates::new(certificates, hashes) {
646            ResultReadCertificates::Certificates(certificates) => certificates,
647            ResultReadCertificates::InvalidHashes(hashes) => {
648                return Err(chain_client::Error::ReadCertificatesError(hashes))
649            }
650        };
651        for certificate in certificates {
652            if let Some(until) = until_block_time {
653                if certificate.value().block().header.timestamp >= until {
654                    break;
655                }
656            }
657            last_info = self.handle_certificate(certificate).await?.info;
658        }
659        Ok(last_info)
660    }
661
662    /// Downloads and processes certificates from the given validator.
663    ///
664    /// Stops when either condition is met:
665    /// - `stop`: the local chain has reached that height (exclusive).
666    /// - `until_block_time`: the next block's timestamp is >= that value (exclusive).
667    #[instrument(level = "trace", skip_all)]
668    async fn download_certificates_from(
669        &self,
670        remote_node: &RemoteNode<Env::ValidatorNode>,
671        chain_id: ChainId,
672        stop: BlockHeight,
673        until_block_time: Option<Timestamp>,
674    ) -> Result<Box<ChainInfo>, chain_client::Error> {
675        let mut last_info = self
676            .load_local_certificates(chain_id, stop, until_block_time)
677            .await?;
678        let mut next_height = last_info.next_block_height;
679
680        if next_height >= stop {
681            return Ok(last_info);
682        }
683
684        // Download remaining certificates from the remote node using a pipelined
685        // sliding window. A background task downloads up to `max_concurrent_batch_downloads`
686        // batches concurrently and sends them through a channel for sequential processing.
687        #[cfg(not(web))]
688        type CertificateBatchFuture = std::pin::Pin<
689            Box<dyn Future<Output = Result<Vec<ConfirmedBlockCertificate>, NodeError>> + Send>,
690        >;
691        #[cfg(web)]
692        type CertificateBatchFuture = std::pin::Pin<
693            Box<dyn Future<Output = Result<Vec<ConfirmedBlockCertificate>, NodeError>>>,
694        >;
695
696        let max_concurrent = self.options.max_concurrent_batch_downloads;
697        let batch_size = self.options.certificate_download_batch_size;
698        let (sender, mut receiver) = tokio::sync::mpsc::channel(max_concurrent);
699        let scheduler = self.requests_scheduler.clone();
700        let remote = remote_node.clone();
701
702        let download_task = linera_base::Task::spawn(async move {
703            let mut download_height = next_height;
704            let mut in_flight = FuturesOrdered::<CertificateBatchFuture>::new();
705
706            let try_enqueue = |in_flight: &mut FuturesOrdered<CertificateBatchFuture>,
707                               download_height: &mut BlockHeight| {
708                if *download_height >= stop {
709                    return;
710                }
711                let limit = u64::from(stop)
712                    .saturating_sub(u64::from(*download_height))
713                    .min(batch_size);
714                let height = *download_height;
715                let scheduler = scheduler.clone();
716                let remote = remote.clone();
717                in_flight.push_back(Box::pin(async move {
718                    scheduler
719                        .download_certificates(&remote, chain_id, height, limit)
720                        .await
721                }));
722                *download_height = BlockHeight(u64::from(*download_height) + limit);
723            };
724
725            while in_flight.len() < max_concurrent && download_height < stop {
726                try_enqueue(&mut in_flight, &mut download_height);
727            }
728
729            while let Some(result) = in_flight.next().await {
730                if sender.send(result).await.is_err() {
731                    break;
732                }
733                try_enqueue(&mut in_flight, &mut download_height);
734            }
735        });
736
737        // Process downloaded batches sequentially.
738        while let Some(result) = receiver.recv().await {
739            let certificates = result?;
740            let Some(info) = self
741                .process_certificates(
742                    slice::from_ref(remote_node),
743                    certificates,
744                    until_block_time,
745                    ProcessConfirmedBlockMode::Execute,
746                )
747                .await?
748            else {
749                break;
750            };
751            assert!(info.next_block_height >= next_height);
752            next_height = info.next_block_height;
753            last_info = info;
754        }
755        // Await the downloader so any panic inside the spawned task surfaces here
756        // instead of being silently swallowed when the channel closes.
757        download_task.await;
758        Ok(last_info)
759    }
760
761    async fn download_blobs(
762        &self,
763        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
764        blob_ids: &[BlobId],
765    ) -> Result<(), chain_client::Error> {
766        let blobs = &self
767            .requests_scheduler
768            .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
769            .await?
770            .ok_or_else(|| {
771                chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
772            })?;
773        self.local_node.store_blobs(blobs).await.map_err(Into::into)
774    }
775
776    /// Downloads the publisher chain certificates that contain the given events,
777    /// using the event block height index on validators. Queries a validator for
778    /// the block heights, downloads those certificates, and processes them — all
779    /// as one atomic unit per validator attempt, with staggered fallback.
780    #[instrument(level = "trace", skip_all)]
781    pub(crate) async fn download_certificates_for_events(
782        &self,
783        event_ids: &[EventId],
784    ) -> Result<(), chain_client::Error> {
785        let mut validators = self.validator_nodes().await?;
786        let timeout = self.options.certificate_batch_download_timeout;
787        let mut remaining_event_ids = event_ids.to_vec();
788
789        while !remaining_event_ids.is_empty() {
790            let remaining_ref = &remaining_event_ids;
791            validators.shuffle(&mut rand::thread_rng());
792            let result = communicate_concurrently(
793                &validators,
794                move |remote_node| {
795                    let validator_key = remote_node.public_key;
796                    let validator_address = remote_node.address();
797                    Box::pin(async move {
798                        // Query this validator for the block heights.
799                        let heights = remote_node
800                            .node
801                            .event_block_heights(remaining_ref.to_vec())
802                            .await?;
803
804                        // Separate resolved and unresolved events.
805                        let mut chain_heights = BTreeMap::<_, BTreeSet<_>>::new();
806                        let mut expected_events = BTreeMap::<_, HashSet<EventId>>::new();
807                        let mut unresolved = Vec::new();
808                        for (event_id, maybe_height) in remaining_ref.iter().zip(heights) {
809                            if let Some(height) = maybe_height {
810                                chain_heights
811                                    .entry(event_id.chain_id)
812                                    .or_default()
813                                    .insert(height);
814                                expected_events
815                                    .entry((event_id.chain_id, height))
816                                    .or_default()
817                                    .insert(event_id.clone());
818                            } else {
819                                unresolved.push(event_id.clone());
820                            }
821                        }
822                        if chain_heights.is_empty() {
823                            // This validator has no useful information.
824                            return Err(chain_client::Error::from(NodeError::EventsNotFound(remaining_ref.clone())));
825                        }
826
827                        // Download certificates and verify them.
828                        let mut checked_certificates = Vec::<ConfirmedBlockCertificate>::new();
829                        for (chain_id, heights) in chain_heights {
830                            let heights_vec = heights.into_iter().collect::<Vec<_>>();
831                            let certificates = self
832                                .requests_scheduler
833                                .download_certificates_by_heights(
834                                    &remote_node,
835                                    chain_id,
836                                    heights_vec,
837                                )
838                                .await?;
839                            for cert in &certificates {
840                                // Verify the block contains the expected events.
841                                let block = cert.block();
842                                let block_event_ids = block.event_ids().collect::<HashSet<_>>();
843                                if let Some(expected_event_ids) =
844                                    expected_events.get(&(chain_id, block.header.height))
845                                {
846                                    if !expected_event_ids.is_subset(&block_event_ids) {
847                                        tracing::debug!(
848                                            %validator_address, ?expected_event_ids, ?block_event_ids,
849                                            "validator lied about events in block."
850                                        );
851                                        return Err(NodeError::UnexpectedCertificateValue.into());
852                                    }
853                                }
854                            }
855                            for cert in certificates {
856                                self.check_certificate(&cert)
857                                    .await
858                                    .map_err(|error| {
859                                        tracing::debug!(
860                                            %validator_address, %error,
861                                            "invalid certificate"
862                                        );
863                                        error
864                                    })?
865                                    .into_result()
866                                    .map_err(|error| {
867                                        tracing::debug!(
868                                            %validator_address, %error,
869                                            "could not check certificate"
870                                        );
871                                        error
872                                    })?;
873                                checked_certificates.push(cert);
874                            }
875                        }
876                        Ok((checked_certificates, unresolved, validator_key))
877                    })
878                },
879                timeout,
880            )
881            .await;
882
883            match result {
884                Ok((certificates, unresolved, validator_key)) => {
885                    for certificate in certificates {
886                        let mode = ReceiveCertificateMode::AlreadyChecked;
887                        self.receive_sender_certificate(
888                            self.storage_client().cache_certificate(certificate),
889                            mode,
890                            None,
891                        )
892                        .await?;
893                    }
894                    validators.retain(|node| node.public_key != validator_key);
895                    remaining_event_ids = unresolved;
896                }
897                Err(errors) => {
898                    for (validator, error) in &errors {
899                        warn!(
900                            %validator,
901                            %error,
902                            "failed to download event certificates from validator",
903                        );
904                    }
905                    // All validators failed; no point retrying.
906                    return Err(NodeError::EventsNotFound(remaining_event_ids).into());
907                }
908            }
909        }
910        Ok(())
911    }
912
913    /// Downloads the checkpoint certificate at `checkpoint_height` from `remote_node`
914    /// and processes it locally, if our chain isn't already past that height. The
915    /// worker's `process_confirmed_block` recognises the gap-plus-checkpoint case and
916    /// installs the chain's execution state from the checkpoint blob before re-running
917    /// the certificate.
918    ///
919    /// The certificate's signatures are still verified against the committee resolved
920    /// from the admin chain's epoch event stream, so the remote node is trusted only
921    /// to point us at a height — not to forge the snapshot itself.
922    #[instrument(level = "trace", skip_all)]
923    async fn bootstrap_chain_from_checkpoint(
924        &self,
925        remote_node: &RemoteNode<Env::ValidatorNode>,
926        chain_id: ChainId,
927        checkpoint_height: BlockHeight,
928    ) -> Result<(), chain_client::Error> {
929        let local_next = match self.local_node.chain_info(chain_id).await {
930            Ok(info) => info.next_block_height,
931            // A freshly-created follower whose storage doesn't yet hold this
932            // chain's description blob: treat as height 0 and let the
933            // checkpoint cert install the snapshot. The chain description is
934            // the only blob `chain_info` ever needs.
935            Err(LocalNodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
936            Err(err) => return Err(err.into()),
937        };
938        if local_next > checkpoint_height {
939            return Ok(());
940        }
941        let certificates = remote_node
942            .download_certificates_by_heights(chain_id, vec![checkpoint_height])
943            .await?;
944        if certificates.is_empty() {
945            // The validator advertised a checkpoint height it can't actually serve;
946            // skip and let the regular sync path take over.
947            return Ok(());
948        }
949        // The first attempt at processing the checkpoint cert will fall into the
950        // worker's `BlocksNotFound` pre-check if pre-checkpoint sender blocks are
951        // missing; `handle_certificate_with_retry` downloads them by hash and
952        // retries, so by the time `process_certificates` returns the chain has
953        // both its restored state and every certified sender block in storage.
954        self.process_certificates(
955            slice::from_ref(remote_node),
956            certificates,
957            None,
958            ProcessConfirmedBlockMode::Execute,
959        )
960        .await?;
961        Ok(())
962    }
963
964    /// Tries to process all the certificates, requesting any missing blobs from the given nodes.
965    /// Returns the chain info of the last successfully processed certificate.
966    /// If `until_block_time` is `Some`, stops before processing any certificate whose
967    /// block timestamp is greater or equal than the given value.
968    #[instrument(level = "trace", skip_all)]
969    async fn process_certificates(
970        &self,
971        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
972        certificates: Vec<ConfirmedBlockCertificate>,
973        until_block_time: Option<Timestamp>,
974        mode: ProcessConfirmedBlockMode,
975    ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
976        let mut info = None;
977        // Blobs created by these certs are already embedded in the downloaded
978        // block bodies, so they don't need to be fetched from a validator. The
979        // chain worker resolves them from `Block::created_blobs()` during
980        // `handle_certificate`.
981        let created_blob_ids = certificates
982            .iter()
983            .flat_map(|certificate| certificate.value().block().created_blob_ids())
984            .collect::<BTreeSet<BlobId>>();
985        let required_blob_ids = certificates
986            .iter()
987            .flat_map(|certificate| certificate.value().required_blob_ids())
988            .filter(|blob_id| !created_blob_ids.contains(blob_id))
989            .collect::<Vec<_>>();
990
991        match self
992            .local_node
993            .read_blob_states_from_storage(&required_blob_ids)
994            .await
995        {
996            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
997                self.download_blobs(remote_nodes, &blob_ids).await?;
998            }
999            x => {
1000                x?;
1001            }
1002        }
1003
1004        for certificate in certificates {
1005            if let Some(until) = until_block_time {
1006                if certificate.value().block().header.timestamp >= until {
1007                    break;
1008                }
1009            }
1010            let response = self
1011                .handle_certificate_with_retry(&certificate, remote_nodes, mode)
1012                .await?;
1013            info = Some(response.info);
1014        }
1015
1016        Ok(info)
1017    }
1018
1019    /// Calls `handle_confirmed_certificate`, retrying with any missing blobs (downloaded
1020    /// from `nodes`) and any missing events (downloaded from the publisher
1021    /// chains via the current validators).
1022    async fn handle_certificate_with_retry(
1023        &self,
1024        certificate: &ConfirmedBlockCertificate,
1025        nodes: &[RemoteNode<Env::ValidatorNode>],
1026        mode: ProcessConfirmedBlockMode,
1027    ) -> Result<ChainInfoResponse, chain_client::Error> {
1028        let mut downloaded_blobs = HashSet::<BlobId>::new();
1029        let mut downloaded_blocks = HashSet::<CryptoHash>::new();
1030        let mut events = EventSetDownloader::new(self);
1031        loop {
1032            let result = self
1033                .handle_confirmed_certificate(certificate.clone(), mode)
1034                .await;
1035            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1036                let new_blobs = filter_new(blob_ids, &downloaded_blobs);
1037                if !new_blobs.is_empty() {
1038                    self.download_blobs(nodes, &new_blobs).await?;
1039                    downloaded_blobs.extend(new_blobs);
1040                    continue;
1041                }
1042            }
1043            if let Err(LocalNodeError::BlocksNotFound(hashes)) = &result {
1044                let new_blocks = filter_new(hashes, &downloaded_blocks);
1045                if !new_blocks.is_empty() {
1046                    self.download_pre_checkpoint_blocks(nodes, &new_blocks)
1047                        .await?;
1048                    downloaded_blocks.extend(new_blocks);
1049                    continue;
1050                }
1051            }
1052            if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1053                if events.download_new(event_ids).await? {
1054                    continue;
1055                }
1056            }
1057            return Ok(result?);
1058        }
1059    }
1060
1061    /// Downloads each missing pre-checkpoint sender block from `nodes` and feeds it
1062    /// through the local worker. The worker's trust-mark accept path verifies the
1063    /// cert against its own epoch's committee and writes it to storage. Routing
1064    /// through `handle_certificate_with_retry` ensures the sender block's own
1065    /// blob/event dependencies (e.g. a `ChainDescription` blob or an admin-chain
1066    /// epoch event for a revoked epoch) get resolved before the cert is accepted.
1067    async fn download_pre_checkpoint_blocks(
1068        &self,
1069        nodes: &[RemoteNode<Env::ValidatorNode>],
1070        hashes: &[CryptoHash],
1071    ) -> Result<(), chain_client::Error> {
1072        for hash in hashes {
1073            let mut last_error = None;
1074            for node in nodes {
1075                match node.node.download_certificate(*hash).await {
1076                    Ok(certificate) => {
1077                        Box::pin(self.handle_certificate_with_retry(
1078                            &certificate,
1079                            nodes,
1080                            ProcessConfirmedBlockMode::Auto,
1081                        ))
1082                        .await?;
1083                        last_error = None;
1084                        break;
1085                    }
1086                    Err(error) => last_error = Some(error),
1087                }
1088            }
1089            if let Some(error) = last_error {
1090                return Err(error.into());
1091            }
1092        }
1093        Ok(())
1094    }
1095
1096    async fn handle_certificate<T: ProcessableCertificate>(
1097        &self,
1098        certificate: GenericCertificate<T>,
1099    ) -> Result<ChainInfoResponse, LocalNodeError> {
1100        self.local_node
1101            .handle_certificate(certificate, &self.notifier)
1102            .await
1103    }
1104
1105    async fn handle_confirmed_certificate(
1106        &self,
1107        certificate: ConfirmedBlockCertificate,
1108        mode: ProcessConfirmedBlockMode,
1109    ) -> Result<ChainInfoResponse, LocalNodeError> {
1110        self.local_node
1111            .handle_confirmed_certificate(certificate, mode, &self.notifier)
1112            .await
1113    }
1114
1115    /// Obtains the committee for the latest epoch on the admin chain.
1116    pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
1117        let info = self.local_node.chain_info(self.admin_chain_id).await?;
1118        let hash = info
1119            .committee_hash
1120            .ok_or(LocalNodeError::InactiveChain(self.admin_chain_id))?;
1121        let committee = self
1122            .storage_client()
1123            .get_or_load_committee_by_hash(hash)
1124            .await?;
1125        Ok((info.epoch, committee))
1126    }
1127
1128    /// Obtains the validators for the latest epoch.
1129    async fn validator_nodes(
1130        &self,
1131    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
1132        let (_, committee) = self.admin_committee().await?;
1133        Ok(self.make_nodes(&committee)?)
1134    }
1135
1136    /// Creates a [`RemoteNode`] for each validator in the committee.
1137    fn make_nodes(
1138        &self,
1139        committee: &Committee,
1140    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
1141        Ok(self
1142            .validator_node_provider()
1143            .make_nodes(committee)?
1144            .map(|(public_key, node)| RemoteNode { public_key, node })
1145            .collect())
1146    }
1147
1148    /// Ensures that the client has the `ChainDescription` blob corresponding to this
1149    /// client's `ChainId`, and returns the chain description blob.
1150    pub async fn get_chain_description_blob(
1151        &self,
1152        chain_id: ChainId,
1153    ) -> Result<Arc<Blob>, chain_client::Error> {
1154        let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
1155        let blob = self
1156            .local_node
1157            .storage_client()
1158            .read_blob(chain_desc_id)
1159            .await?;
1160        if let Some(blob) = blob {
1161            // We have the blob - return it.
1162            return Ok(blob.into_std());
1163        }
1164        // Recover history from the current validators, according to the admin chain.
1165        self.synchronize_chain_state(self.admin_chain_id).await?;
1166        let nodes = self.validator_nodes().await?;
1167        Ok(self
1168            .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
1169            .await?
1170            .pop()
1171            .unwrap() // Returns exactly as many blobs as passed-in IDs.
1172            .into_std())
1173    }
1174
1175    /// Ensures that the client has the `ChainDescription` blob corresponding to this
1176    /// client's `ChainId`, and returns the chain description.
1177    pub async fn get_chain_description(
1178        &self,
1179        chain_id: ChainId,
1180    ) -> Result<ChainDescription, chain_client::Error> {
1181        let blob = self.get_chain_description_blob(chain_id).await?;
1182        Ok(bcs::from_bytes(blob.bytes())?)
1183    }
1184
1185    /// Submits a validated block for finalization and returns the confirmed block certificate.
1186    #[instrument(level = "trace", skip_all)]
1187    pub(crate) async fn finalize_block(
1188        self: &Arc<Self>,
1189        committee: &Committee,
1190        certificate: ValidatedBlockCertificate,
1191    ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
1192        debug!(round = %certificate.round, "Submitting block for confirmation");
1193        let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
1194        let finalize_action = CommunicateAction::FinalizeBlock {
1195            certificate: Box::new(certificate),
1196            delivery: self.options.cross_chain_message_delivery,
1197        };
1198        let certificate = self
1199            .communicate_chain_action(committee, finalize_action, hashed_value)
1200            .await?;
1201        self.receive_certificate_with_checked_signatures(
1202            certificate.clone(),
1203            ProcessConfirmedBlockMode::Execute,
1204        )
1205        .await?;
1206        Ok(certificate)
1207    }
1208
1209    /// Submits a block proposal to the validators.
1210    #[instrument(level = "trace", skip_all)]
1211    async fn submit_block_proposal<T: ProcessableCertificate>(
1212        self: &Arc<Self>,
1213        committee: Arc<Committee>,
1214        proposal: Box<BlockProposal>,
1215        value: T,
1216    ) -> Result<GenericCertificate<T>, chain_client::Error> {
1217        debug!(
1218            round = %proposal.content.round,
1219            "Submitting block proposal to validators"
1220        );
1221
1222        // Check if the block timestamp is in the future and log INFO.
1223        let block_timestamp = proposal.content.block.timestamp;
1224        let local_time = self.local_node.storage_client().clock().current_time();
1225        if block_timestamp > local_time {
1226            info!(
1227                chain_id = %proposal.content.block.chain_id,
1228                %block_timestamp,
1229                %local_time,
1230                "Block timestamp is in the future; waiting until it can be proposed",
1231            );
1232        }
1233
1234        // Create channel for clock skew reports from validators.
1235        let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
1236        let submit_action = CommunicateAction::SubmitBlock {
1237            proposal,
1238            blob_ids: value.required_blob_ids().into_iter().collect(),
1239            clock_skew_sender,
1240        };
1241
1242        // Spawn a task to monitor clock skew reports and warn if threshold is reached.
1243        let validity_threshold = committee.validity_threshold();
1244        let committee_clone = committee.clone();
1245        let clock_skew_check_handle = linera_base::Task::spawn(async move {
1246            let mut skew_weight = 0u64;
1247            let mut min_skew = TimeDelta::MAX;
1248            let mut max_skew = TimeDelta::ZERO;
1249            while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
1250                if clock_skew.as_micros() > 0 {
1251                    skew_weight += committee_clone.weight(&public_key);
1252                    min_skew = min_skew.min(clock_skew);
1253                    max_skew = max_skew.max(clock_skew);
1254                    if skew_weight >= validity_threshold {
1255                        warn!(
1256                            skew_weight,
1257                            validity_threshold,
1258                            min_skew_ms = min_skew.as_micros() / 1000,
1259                            max_skew_ms = max_skew.as_micros() / 1000,
1260                            "A validity threshold of validators reported clock skew; \
1261                             consider checking your system clock",
1262                        );
1263                        return;
1264                    }
1265                }
1266            }
1267        });
1268
1269        let certificate = self
1270            .communicate_chain_action(&committee, submit_action, value)
1271            .await?;
1272
1273        clock_skew_check_handle.await;
1274
1275        self.handle_certificate(certificate.clone()).await?;
1276        Ok(certificate)
1277    }
1278
1279    /// Broadcasts certified blocks to validators.
1280    #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
1281    async fn communicate_chain_updates(
1282        self: &Arc<Self>,
1283        committee: &Committee,
1284        chain_id: ChainId,
1285        height: BlockHeight,
1286        delivery: CrossChainMessageDelivery,
1287        latest_certificate: Option<CacheArc<GenericCertificate<ConfirmedBlock>>>,
1288    ) -> Result<(), chain_client::Error> {
1289        let nodes = self.make_nodes(committee)?;
1290        communicate_with_quorum(
1291            &nodes,
1292            committee,
1293            |_: &()| (),
1294            |remote_node| {
1295                let mut updater = ValidatorUpdater {
1296                    remote_node,
1297                    client: self.clone(),
1298                    admin_chain_id: self.admin_chain_id,
1299                };
1300                let certificate = latest_certificate.clone();
1301                Box::pin(async move {
1302                    updater
1303                        .send_chain_information(chain_id, height, delivery, certificate)
1304                        .await
1305                })
1306            },
1307            self.options.quorum_grace_period,
1308        )
1309        .await?;
1310        Ok(())
1311    }
1312
1313    /// Broadcasts certified blocks and optionally a block proposal, certificate or
1314    /// leader timeout request.
1315    ///
1316    /// In that case, it verifies that the validator votes are for the provided value,
1317    /// and returns a certificate.
1318    #[instrument(level = "trace", skip_all)]
1319    async fn communicate_chain_action<T: CertificateValue>(
1320        self: &Arc<Self>,
1321        committee: &Committee,
1322        action: CommunicateAction,
1323        value: T,
1324    ) -> Result<GenericCertificate<T>, chain_client::Error> {
1325        let nodes = self.make_nodes(committee)?;
1326        let ((votes_hash, votes_round), votes) = communicate_with_quorum(
1327            &nodes,
1328            committee,
1329            |vote: &LiteVote| (vote.value.value_hash, vote.round),
1330            |remote_node| {
1331                let mut updater = ValidatorUpdater {
1332                    remote_node,
1333                    client: self.clone(),
1334                    admin_chain_id: self.admin_chain_id,
1335                };
1336                let action = action.clone();
1337                Box::pin(async move { updater.send_chain_update(action).await })
1338            },
1339            self.options.quorum_grace_period,
1340        )
1341        .await?;
1342        ensure!(
1343            (votes_hash, votes_round) == (value.hash(), action.round()),
1344            chain_client::Error::UnexpectedQuorum {
1345                hash: votes_hash,
1346                round: votes_round,
1347                expected_hash: value.hash(),
1348                expected_round: action.round(),
1349            }
1350        );
1351        // Certificate is valid because
1352        // * `communicate_with_quorum` ensured a sufficient "weight" of
1353        // (non-error) answers were returned by validators.
1354        // * each answer is a vote signed by the expected validator.
1355        let certificate = LiteCertificate::try_from_votes(votes)
1356            .ok_or_else(|| {
1357                chain_client::Error::InternalError(
1358                    "Vote values or rounds don't match; this is a bug",
1359                )
1360            })?
1361            .with_value(value)
1362            .ok_or_else(|| {
1363                chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
1364            })?;
1365        Ok(certificate)
1366    }
1367
1368    /// Processes the confirmed block certificate in the local node without checking signatures.
1369    /// Also downloads and processes all ancestors that are still missing.
1370    #[instrument(level = "trace", skip_all)]
1371    async fn receive_certificate_with_checked_signatures(
1372        &self,
1373        certificate: ConfirmedBlockCertificate,
1374        mode: ProcessConfirmedBlockMode,
1375    ) -> Result<(), chain_client::Error> {
1376        let block = certificate.block();
1377        // Recover history from the network.
1378        self.download_certificates(block.header.chain_id, block.header.height)
1379            .await?;
1380        // Process the received operations. Download required hashed certificate values if
1381        // necessary.
1382        let nodes = self.validator_nodes().await?;
1383        self.handle_certificate_with_retry(&certificate, &nodes, mode)
1384            .await?;
1385        Ok(())
1386    }
1387
1388    /// Processes the confirmed block in the local node. Whether it is fully
1389    /// executed or only preprocessed depends on the chain's [`ListeningMode`]:
1390    /// chains we follow get executed, untracked or events-only chains are only
1391    /// preprocessed.
1392    #[instrument(level = "trace", skip_all)]
1393    async fn receive_sender_certificate(
1394        &self,
1395        certificate: CacheArc<ConfirmedBlockCertificate>,
1396        mode: ReceiveCertificateMode,
1397        nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
1398    ) -> Result<(), chain_client::Error> {
1399        // Verify the certificate before doing any expensive networking.
1400        if let ReceiveCertificateMode::NeedsCheck = mode {
1401            self.check_certificate(&certificate).await?.into_result()?;
1402        }
1403        // Recover history from the network.
1404        let nodes = if let Some(nodes) = nodes {
1405            nodes
1406        } else {
1407            self.validator_nodes().await?
1408        };
1409        let processing_mode = if self
1410            .chain_mode(certificate.value().chain_id())
1411            .is_some_and(|m| m.should_sync_chain_state())
1412        {
1413            ProcessConfirmedBlockMode::Auto
1414        } else {
1415            ProcessConfirmedBlockMode::Preprocess
1416        };
1417        self.handle_certificate_with_retry(&certificate, &nodes, processing_mode)
1418            .await?;
1419
1420        Ok(())
1421    }
1422
1423    /// Downloads and processes certificates for sender chain blocks.
1424    #[instrument(level = "debug", skip_all, fields(chain_id = %sender_chain_id))]
1425    async fn download_and_process_sender_chain(
1426        &self,
1427        sender_chain_id: ChainId,
1428        nodes: &[RemoteNode<Env::ValidatorNode>],
1429        received_log: &ReceivedLogs,
1430        mut remote_heights: Vec<BlockHeight>,
1431        sender: mpsc::UnboundedSender<ChainAndHeight>,
1432    ) {
1433        let mut nodes = nodes.to_vec();
1434        while !remote_heights.is_empty() {
1435            // Check local storage first — certificates may already be available from
1436            // a prior sync cycle, another receiver chain, or a concurrent notification.
1437            if let Ok(local_certs) = self
1438                .storage_client()
1439                .read_certificates_by_heights(sender_chain_id, &remote_heights)
1440                .await
1441            {
1442                let mut still_needed = Vec::new();
1443                for (height, maybe_cert) in remote_heights.iter().copied().zip(local_certs) {
1444                    if let Some(certificate) = maybe_cert {
1445                        let chain_id = certificate.block().header.chain_id;
1446                        if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1447                            error!(
1448                                %chain_id, %height, %error,
1449                                "failed to send chain and height over the channel",
1450                            );
1451                        }
1452                    } else {
1453                        still_needed.push(height);
1454                    }
1455                }
1456                remote_heights = still_needed;
1457                if remote_heights.is_empty() {
1458                    break;
1459                }
1460            }
1461
1462            let remote_heights_ref = &remote_heights;
1463            let certificates = match communicate_concurrently(
1464                &nodes,
1465                async move |remote_node| {
1466                    let mut remote_heights = remote_heights_ref.clone();
1467                    // No need trying to download certificates the validator didn't have in their
1468                    // log - we'll retry downloading the remaining ones next time we loop.
1469                    remote_heights.retain(|height| {
1470                        received_log.validator_has_block(
1471                            &remote_node.public_key,
1472                            sender_chain_id,
1473                            *height,
1474                        )
1475                    });
1476                    if remote_heights.is_empty() {
1477                        // It makes no sense to return `Ok(_)` if we aren't going to try downloading
1478                        // anything from the validator - let the function try the other validators
1479                        return Err(NodeError::MissingCertificateValue);
1480                    }
1481                    let certificates = self
1482                        .requests_scheduler
1483                        .download_certificates_by_heights(
1484                            &remote_node,
1485                            sender_chain_id,
1486                            remote_heights,
1487                        )
1488                        .await?;
1489                    let mut certificates_with_check_results = vec![];
1490                    for cert in certificates {
1491                        let check_result = self.check_certificate(&cert).await?;
1492                        certificates_with_check_results
1493                            .push((cert, check_result.into_result().is_ok()));
1494                    }
1495                    Ok(certificates_with_check_results)
1496                },
1497                self.options.certificate_batch_download_timeout,
1498            )
1499            .await
1500            {
1501                Ok(certificates_with_check_results) => certificates_with_check_results,
1502                Err(errors) => {
1503                    let faulty_validators = errors
1504                        .into_iter()
1505                        .map(|(validator, error)| {
1506                            warn!(
1507                                %validator,
1508                                %sender_chain_id,
1509                                %error,
1510                                "failed to download certificates from validator",
1511                            );
1512                            validator
1513                        })
1514                        .collect::<BTreeSet<_>>();
1515                    // filter out faulty validators and retry if any are left
1516                    nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1517                    if nodes.is_empty() {
1518                        info!(
1519                            chain_id = %sender_chain_id,
1520                            "could not download certificates for chain - no more correct validators left"
1521                        );
1522                        return;
1523                    }
1524                    continue;
1525                }
1526            };
1527
1528            trace!(
1529                num_certificates = %certificates.len(),
1530                "received certificates",
1531            );
1532
1533            let mut to_remove_from_queue = BTreeSet::new();
1534
1535            for (certificate, check_result) in certificates {
1536                let hash = certificate.hash();
1537                let chain_id = certificate.block().header.chain_id;
1538                let height = certificate.block().header.height;
1539                if !check_result {
1540                    // The certificate was correctly signed, but we were missing a committee to
1541                    // validate it properly - do not receive it, but also do not attempt to
1542                    // re-download it.
1543                    to_remove_from_queue.insert(height);
1544                    continue;
1545                }
1546                // We checked the certificates right after downloading them.
1547                let mode = ReceiveCertificateMode::AlreadyChecked;
1548                if let Err(error) = self
1549                    .receive_sender_certificate(
1550                        self.storage_client().cache_certificate(certificate),
1551                        mode,
1552                        None,
1553                    )
1554                    .await
1555                {
1556                    warn!(%error, %hash, "Received invalid certificate");
1557                } else {
1558                    to_remove_from_queue.insert(height);
1559                    if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1560                        error!(
1561                            %chain_id,
1562                            %height,
1563                            %error,
1564                            "failed to send chain and height over the channel",
1565                        );
1566                    }
1567                }
1568            }
1569
1570            remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1571        }
1572        trace!("find_received_certificates: finished processing chain");
1573    }
1574
1575    /// Downloads the log of received messages for a chain from a validator.
1576    #[instrument(level = "trace", skip(self))]
1577    async fn get_received_log_from_validator(
1578        &self,
1579        chain_id: ChainId,
1580        remote_node: &RemoteNode<Env::ValidatorNode>,
1581        tracker: u64,
1582    ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1583        let mut offset = tracker;
1584
1585        // Retrieve the list of newly received certificates from this validator.
1586        let mut remote_log = Vec::new();
1587        loop {
1588            trace!("get_received_log_from_validator: looping");
1589            let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1590            let info = remote_node.handle_chain_info_query(query).await?;
1591            let received_entries = info.requested_received_log.len();
1592            offset += received_entries as u64;
1593            remote_log.extend(info.requested_received_log);
1594            trace!(
1595                remote_node = remote_node.address(),
1596                %received_entries,
1597                "get_received_log_from_validator: received log batch",
1598            );
1599            if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1600                break;
1601            }
1602        }
1603
1604        trace!(
1605            remote_node = remote_node.address(),
1606            num_entries = remote_log.len(),
1607            "get_received_log_from_validator: returning downloaded log",
1608        );
1609
1610        Ok(remote_log)
1611    }
1612
1613    /// Downloads a specific sender block and recursively downloads any earlier blocks
1614    /// that also sent a message to our chain, based on `previous_message_blocks`.
1615    ///
1616    /// This ensures that we have all the sender blocks needed to preprocess the target block
1617    /// and put the messages to our chain into the outbox.
1618    async fn download_sender_block_with_sending_ancestors(
1619        &self,
1620        receiver_chain_id: ChainId,
1621        sender_chain_id: ChainId,
1622        height: BlockHeight,
1623        remote_node: &RemoteNode<Env::ValidatorNode>,
1624    ) -> Result<(), chain_client::Error> {
1625        let next_outbox_height = self
1626            .local_node
1627            .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1628            .await?
1629            .get(&sender_chain_id)
1630            .copied()
1631            .unwrap_or(BlockHeight::ZERO);
1632
1633        // Recursively collect all certificates we need, following
1634        // the chain of previous_message_blocks back to next_outbox_height.
1635        let mut certificates = BTreeMap::new();
1636        let mut current_height = height;
1637        // On the first iteration we only have a height; subsequent iterations
1638        // also carry the hash from `previous_message_blocks`.
1639        let mut current_hash: Option<CryptoHash> = None;
1640
1641        // Stop if we've reached the height we've already processed.
1642        while current_height >= next_outbox_height {
1643            // Try local storage first — avoids a validator round-trip when
1644            // the certificate was already downloaded by a prior sync cycle,
1645            // another receiver chain, or a concurrent notification handler.
1646            let certificate = if let Some(local) = self
1647                .try_read_local_certificate(sender_chain_id, current_height, current_hash)
1648                .await?
1649            {
1650                local
1651            } else {
1652                let downloaded = self
1653                    .requests_scheduler
1654                    .download_certificates_by_heights(
1655                        remote_node,
1656                        sender_chain_id,
1657                        vec![current_height],
1658                    )
1659                    .await?;
1660                let Some(certificate) = downloaded.into_iter().next() else {
1661                    return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1662                        chain_id: sender_chain_id,
1663                        height: current_height,
1664                    });
1665                };
1666                self.storage_client().cache_certificate(certificate)
1667            };
1668
1669            // Validate the certificate.
1670            self.check_certificate(&certificate).await?.into_result()?;
1671
1672            // Check if there's a previous message block to our chain.
1673            let block = certificate.block();
1674            let next = block
1675                .body
1676                .previous_message_blocks
1677                .get(&receiver_chain_id)
1678                .map(|(prev_hash, prev_height)| (*prev_hash, *prev_height));
1679
1680            // Store this certificate.
1681            certificates.insert(current_height, certificate);
1682
1683            if let Some((prev_hash, prev_height)) = next {
1684                // Continue with the previous block (now with its hash for local lookup).
1685                current_height = prev_height;
1686                current_hash = Some(prev_hash);
1687            } else {
1688                // No more dependencies.
1689                break;
1690            }
1691        }
1692
1693        if certificates.is_empty() {
1694            self.retry_pending_cross_chain_requests(sender_chain_id)
1695                .await?;
1696        }
1697
1698        // Process certificates in ascending block height order (BTreeMap keeps them sorted).
1699        for certificate in certificates.into_values() {
1700            self.receive_sender_certificate(
1701                certificate,
1702                ReceiveCertificateMode::AlreadyChecked,
1703                Some(vec![remote_node.clone()]),
1704            )
1705            .await?;
1706        }
1707
1708        Ok(())
1709    }
1710
1711    /// Downloads event-bearing blocks for the given streams by walking the
1712    /// `previous_event_blocks` linked list backwards from `height`, stopping when we
1713    /// reach blocks that are already executed locally or whose events we already track.
1714    async fn download_event_bearing_blocks(
1715        &self,
1716        publisher_chain_id: ChainId,
1717        initial_blocks: BTreeSet<(BlockHeight, CryptoHash)>,
1718        local_next_block_height: BlockHeight,
1719        subscribed_streams: &BTreeSet<StreamId>,
1720        remote_node: &RemoteNode<Env::ValidatorNode>,
1721    ) -> Result<(), chain_client::Error> {
1722        if initial_blocks.is_empty() {
1723            return Ok(());
1724        }
1725
1726        let mut certificates = BTreeMap::new();
1727        let mut blocks_to_fetch = initial_blocks;
1728        let next_expected_events = self
1729            .local_node
1730            .next_expected_events(
1731                publisher_chain_id,
1732                subscribed_streams.iter().cloned().collect(),
1733            )
1734            .await?;
1735
1736        while let Some((current_height, current_hash)) = blocks_to_fetch.pop_last() {
1737            if current_height < local_next_block_height {
1738                continue; // Already executed locally.
1739            }
1740            if certificates.contains_key(&current_height) {
1741                continue;
1742            }
1743
1744            let certificate = if let Some(certificate) =
1745                self.storage_client().read_certificate(current_hash).await?
1746            {
1747                certificate
1748            } else {
1749                let downloaded = self
1750                    .requests_scheduler
1751                    .download_certificates(remote_node, publisher_chain_id, current_height, 1)
1752                    .await?;
1753                let Some(certificate) = downloaded.into_iter().next() else {
1754                    tracing::debug!(
1755                        validator = remote_node.address(),
1756                        %publisher_chain_id,
1757                        height = %current_height,
1758                        "failed to download event publisher block"
1759                    );
1760                    continue;
1761                };
1762
1763                self.check_certificate(&certificate).await?.into_result()?;
1764
1765                self.storage_client().cache_certificate(certificate)
1766            };
1767
1768            let block = certificate.block();
1769            // Walk previous_event_blocks for subscribed streams.
1770            for stream_id in subscribed_streams {
1771                if let Some((prev_hash, prev_height)) =
1772                    block.body.previous_event_blocks.get(stream_id)
1773                {
1774                    if next_expected_events.get(stream_id).is_some_and(|index| {
1775                        block
1776                            .body
1777                            .events
1778                            .iter()
1779                            .flatten()
1780                            .find(|event| event.stream_id == *stream_id)
1781                            .is_some_and(|event| event.index == *index)
1782                    }) {
1783                        continue;
1784                    }
1785                    if !certificates.contains_key(prev_height) {
1786                        blocks_to_fetch.insert((*prev_height, *prev_hash));
1787                    }
1788                }
1789            }
1790
1791            certificates.insert(current_height, certificate);
1792        }
1793
1794        // Process in ascending height order.
1795        for certificate in certificates.into_values() {
1796            self.receive_sender_certificate(
1797                certificate,
1798                ReceiveCertificateMode::AlreadyChecked,
1799                Some(vec![remote_node.clone()]),
1800            )
1801            .await?;
1802        }
1803
1804        Ok(())
1805    }
1806
1807    /// Queries a validator for event-bearing blocks for the given streams, then downloads
1808    /// them.
1809    async fn sync_events_from_node(
1810        &self,
1811        chain_id: ChainId,
1812        stream_ids: &BTreeSet<StreamId>,
1813        remote_node: &RemoteNode<Env::ValidatorNode>,
1814    ) -> Result<(), chain_client::Error> {
1815        let stream_ids_vec = stream_ids.iter().cloned().collect::<Vec<_>>();
1816        let mut initial_blocks = BTreeSet::new();
1817        for chunk in stream_ids_vec.chunks(self.options.max_event_stream_queries) {
1818            let query = ChainInfoQuery::new(chain_id).with_previous_event_blocks(chunk.to_vec());
1819            let info = remote_node.handle_chain_info_query(query).await?;
1820            initial_blocks.extend(info.requested_previous_event_blocks.values().copied());
1821        }
1822        let local_height = match self.local_node.chain_info(chain_id).await {
1823            Ok(info) => info.next_block_height,
1824            Err(LocalNodeError::InactiveChain(_) | LocalNodeError::BlobsNotFound(_)) => {
1825                BlockHeight::ZERO
1826            }
1827            Err(error) => return Err(error.into()),
1828        };
1829        self.download_event_bearing_blocks(
1830            chain_id,
1831            initial_blocks,
1832            local_height,
1833            stream_ids,
1834            remote_node,
1835        )
1836        .await
1837    }
1838
1839    #[instrument(
1840        level = "trace", skip_all,
1841        fields(certificate_hash = ?incoming_certificate.hash()),
1842    )]
1843    async fn check_certificate(
1844        &self,
1845        incoming_certificate: &ConfirmedBlockCertificate,
1846    ) -> Result<CheckCertificateResult, NodeError> {
1847        let epoch = incoming_certificate.block().header.epoch;
1848        let storage = self.storage_client();
1849        let view_err = |error: ExecutionError| NodeError::ViewError {
1850            error: error.to_string(),
1851        };
1852        if storage.is_epoch_revoked(epoch).await.map_err(view_err)? {
1853            return Ok(CheckCertificateResult::OldEpoch);
1854        }
1855        let Some(committee) = storage.committee_for_epoch(epoch).await.map_err(view_err)? else {
1856            return Ok(CheckCertificateResult::FutureEpoch);
1857        };
1858        incoming_certificate.check(&committee)?;
1859        Ok(CheckCertificateResult::New)
1860    }
1861
1862    /// Downloads and processes any certificates we are missing for the given chain.
1863    ///
1864    /// Whether manager values are fetched depends on the chain's follow-only state.
1865    #[instrument(level = "trace", skip_all)]
1866    async fn synchronize_chain_state(
1867        &self,
1868        chain_id: ChainId,
1869    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1870        let (_, committee) = self.admin_committee().await?;
1871        self.synchronize_chain_from_committee(chain_id, committee)
1872            .await
1873    }
1874
1875    /// Downloads certificates for the given chain from the given committee.
1876    ///
1877    /// If the chain is not in follow-only mode, also fetches and processes manager values
1878    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1879    #[instrument(level = "trace", skip_all)]
1880    pub(crate) async fn synchronize_chain_from_committee(
1881        &self,
1882        chain_id: ChainId,
1883        committee: Arc<Committee>,
1884    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1885        #[cfg(with_metrics)]
1886        let _latency = if !self.is_chain_follow_only(chain_id) {
1887            Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1888        } else {
1889            None
1890        };
1891
1892        let validators = self.make_nodes(&committee)?;
1893        Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1894        communicate_with_quorum(
1895            &validators,
1896            &committee,
1897            |_: &()| (),
1898            |remote_node| async move {
1899                self.synchronize_chain_state_from(&remote_node, chain_id)
1900                    .await
1901            },
1902            self.options.quorum_grace_period,
1903        )
1904        .await?;
1905
1906        self.local_node
1907            .chain_info(chain_id)
1908            .await
1909            .map_err(Into::into)
1910    }
1911
1912    /// Downloads any certificates from the specified validator that we are missing for the given
1913    /// chain.
1914    ///
1915    /// If the chain is not in follow-only mode, also fetches and processes manager values
1916    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1917    #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1918    pub(crate) async fn synchronize_chain_state_from(
1919        &self,
1920        remote_node: &RemoteNode<Env::ValidatorNode>,
1921        chain_id: ChainId,
1922    ) -> Result<(), chain_client::Error> {
1923        let with_manager_values = !self.is_chain_follow_only(chain_id);
1924        let query = if with_manager_values {
1925            ChainInfoQuery::new(chain_id).with_manager_values()
1926        } else {
1927            ChainInfoQuery::new(chain_id)
1928        };
1929        let query = query.with_latest_checkpoint_height();
1930        let remote_info = remote_node.handle_chain_info_query(query).await?;
1931
1932        // If the validator advertises a checkpoint and our local tip is below it, fetch
1933        // the checkpoint cert and blob and apply the cert directly. The chain worker's
1934        // `process_confirmed_block` recognises the gap-plus-checkpoint case and installs
1935        // the chain's execution state from the blob before re-executing the cert. The
1936        // subsequent `download_certificates_from` call then resumes from the post-
1937        // checkpoint height and skips downloading any pre-checkpoint blocks.
1938        if let Some(checkpoint_height) = remote_info.requested_latest_checkpoint_height {
1939            self.bootstrap_chain_from_checkpoint(remote_node, chain_id, checkpoint_height)
1940                .await?;
1941        }
1942
1943        let local_info = self
1944            .download_certificates_from(remote_node, chain_id, remote_info.next_block_height, None)
1945            .await?;
1946
1947        if !with_manager_values {
1948            return Ok(());
1949        }
1950
1951        // If we are at the same height as the remote node, we also update our chain manager.
1952        let local_height = local_info.next_block_height;
1953        if local_height != remote_info.next_block_height {
1954            debug!(
1955                remote_node = remote_node.address(),
1956                remote_height = %remote_info.next_block_height,
1957                local_height = %local_height,
1958                "synced from validator, but remote height and local height are different",
1959            );
1960            return Ok(());
1961        };
1962
1963        if let Some(timeout) = remote_info.manager.timeout {
1964            self.handle_certificate(*timeout).await?;
1965        }
1966        let mut proposals = Vec::new();
1967        if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1968            proposals.push(*proposal);
1969        }
1970        if let Some(proposal) = remote_info.manager.requested_proposed {
1971            proposals.push(*proposal);
1972        }
1973        if let Some(locking) = remote_info.manager.requested_locking {
1974            match *locking {
1975                LockingBlock::Fast(proposal) => {
1976                    proposals.push(proposal);
1977                }
1978                LockingBlock::Regular(cert) => {
1979                    let hash = cert.hash();
1980                    if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1981                    {
1982                        debug!(
1983                            remote_node = remote_node.address(),
1984                            %hash,
1985                            height = %local_height,
1986                            %error,
1987                            "skipping locked block from validator",
1988                        );
1989                    }
1990                }
1991            }
1992        }
1993        'proposal_loop: for proposal in proposals {
1994            let owner: AccountOwner = proposal.owner();
1995            if let Err(mut err) = self
1996                .local_node
1997                .handle_block_proposal(proposal.clone())
1998                .await
1999            {
2000                if let LocalNodeError::BlobsNotFound(_) = &err {
2001                    let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
2002                    if !required_blob_ids.is_empty() {
2003                        let mut blobs = Vec::new();
2004                        for blob_id in required_blob_ids {
2005                            let blob_content = match self
2006                                .requests_scheduler
2007                                .download_pending_blob(remote_node, chain_id, blob_id)
2008                                .await
2009                            {
2010                                Ok(content) => content,
2011                                Err(error) => {
2012                                    info!(
2013                                        remote_node = remote_node.address(),
2014                                        height = %local_height,
2015                                        proposer = %owner,
2016                                        %blob_id,
2017                                        %error,
2018                                        "skipping proposal from validator; failed to download blob",
2019                                    );
2020                                    continue 'proposal_loop;
2021                                }
2022                            };
2023                            blobs.push(Blob::new(blob_content));
2024                        }
2025                        self.local_node
2026                            .handle_pending_blobs(chain_id, blobs)
2027                            .await?;
2028                        // We found the missing blobs: retry.
2029                        if let Err(new_err) = self
2030                            .local_node
2031                            .handle_block_proposal(proposal.clone())
2032                            .await
2033                        {
2034                            err = new_err;
2035                        } else {
2036                            continue;
2037                        }
2038                    }
2039                    if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
2040                        self.update_local_node_with_blobs_from(
2041                            blob_ids.clone(),
2042                            slice::from_ref(remote_node),
2043                        )
2044                        .await?;
2045                        // We found the missing blobs: retry.
2046                        if let Err(new_err) = self
2047                            .local_node
2048                            .handle_block_proposal(proposal.clone())
2049                            .await
2050                        {
2051                            err = new_err;
2052                        } else {
2053                            continue;
2054                        }
2055                    }
2056                }
2057                if let LocalNodeError::EventsNotFound(event_ids) = &err {
2058                    if let Err(error) =
2059                        Box::pin(self.download_certificates_for_events(event_ids)).await
2060                    {
2061                        info!(
2062                            remote_node = remote_node.address(),
2063                            height = %local_height,
2064                            proposer = %owner,
2065                            %error,
2066                            "skipping proposal from validator; failed to download events",
2067                        );
2068                        continue 'proposal_loop;
2069                    }
2070                    // We found the missing publisher chain data: retry.
2071                    if let Err(new_err) = self
2072                        .local_node
2073                        .handle_block_proposal(proposal.clone())
2074                        .await
2075                    {
2076                        err = new_err;
2077                    } else {
2078                        continue;
2079                    }
2080                }
2081                while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
2082                    if let ChainError::MissingCrossChainUpdate {
2083                        chain_id,
2084                        origin,
2085                        height,
2086                    } = &**chain_err
2087                    {
2088                        self.download_sender_block_with_sending_ancestors(
2089                            *chain_id,
2090                            *origin,
2091                            *height,
2092                            remote_node,
2093                        )
2094                        .await?;
2095                        // Retry
2096                        if let Err(new_err) = self
2097                            .local_node
2098                            .handle_block_proposal(proposal.clone())
2099                            .await
2100                        {
2101                            err = new_err;
2102                        } else {
2103                            continue 'proposal_loop;
2104                        }
2105                    } else {
2106                        break;
2107                    }
2108                }
2109
2110                debug!(
2111                    remote_node = remote_node.address(),
2112                    proposer = %owner,
2113                    height = %local_height,
2114                    error = %err,
2115                    "skipping proposal from validator",
2116                );
2117            }
2118        }
2119        Ok(())
2120    }
2121
2122    async fn try_process_locking_block_from(
2123        &self,
2124        remote_node: &RemoteNode<Env::ValidatorNode>,
2125        certificate: GenericCertificate<ValidatedBlock>,
2126    ) -> Result<(), chain_client::Error> {
2127        let chain_id = certificate.inner().chain_id();
2128        let mut downloaded_blobs = HashSet::<BlobId>::new();
2129        let mut events = EventSetDownloader::new(self);
2130        loop {
2131            let result = self.handle_certificate(certificate.clone()).await;
2132            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2133                let new_blobs = filter_new(blob_ids, &downloaded_blobs);
2134                if !new_blobs.is_empty() {
2135                    let mut blobs = Vec::new();
2136                    for blob_id in &new_blobs {
2137                        let blob_content = self
2138                            .requests_scheduler
2139                            .download_pending_blob(remote_node, chain_id, *blob_id)
2140                            .await?;
2141                        blobs.push(Blob::new(blob_content));
2142                    }
2143                    self.local_node
2144                        .handle_pending_blobs(chain_id, blobs)
2145                        .await?;
2146                    downloaded_blobs.extend(new_blobs);
2147                    continue;
2148                }
2149            }
2150            if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
2151                if events.download_new(event_ids).await? {
2152                    continue;
2153                }
2154            }
2155            result?;
2156            return Ok(());
2157        }
2158    }
2159
2160    /// Downloads and processes from the specified validators a confirmed block certificates that
2161    /// use the given blobs. If this succeeds, the blob will be in our storage.
2162    async fn update_local_node_with_blobs_from(
2163        &self,
2164        blob_ids: Vec<BlobId>,
2165        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
2166    ) -> Result<Vec<CacheArc<Blob>>, chain_client::Error> {
2167        let timeout = self.options.blob_download_timeout;
2168        // Deduplicate IDs.
2169        let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
2170        stream::iter(blob_ids.into_iter().map(|blob_id| {
2171            communicate_concurrently(
2172                remote_nodes,
2173                async move |remote_node| {
2174                    let certificate = self
2175                        .requests_scheduler
2176                        .download_certificate_for_blob(&remote_node, blob_id)
2177                        .await?;
2178                    self.receive_sender_certificate(
2179                        self.storage_client().cache_certificate(certificate),
2180                        ReceiveCertificateMode::NeedsCheck,
2181                        Some(vec![remote_node.clone()]),
2182                    )
2183                    .await?;
2184                    let blob = self
2185                        .local_node
2186                        .storage_client()
2187                        .read_blob(blob_id)
2188                        .await?
2189                        .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
2190                    Result::<_, chain_client::Error>::Ok(blob)
2191                },
2192                timeout,
2193            )
2194            .map_err(move |errors| {
2195                for (validator, error) in &errors {
2196                    warn!(
2197                        %validator,
2198                        %blob_id,
2199                        %error,
2200                        "failed to download certificate-for-blob from validator",
2201                    );
2202                }
2203                chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id]))
2204            })
2205        }))
2206        .buffer_unordered(self.options.max_joined_tasks)
2207        .collect::<Vec<_>>()
2208        .await
2209        .into_iter()
2210        .collect()
2211    }
2212
2213    /// Attempts to execute the block locally. If any incoming message execution fails, that
2214    /// message is rejected and execution is retried, until the block accepts only messages
2215    /// that succeed.
2216    ///
2217    /// Attempts to execute the block locally with a specified policy for handling bundle failures.
2218    /// If any attempt to read a blob fails, the blob is downloaded and execution is retried.
2219    ///
2220    /// Returns the modified block (bundles may be rejected/removed based on the policy)
2221    /// and the execution result.
2222    #[instrument(level = "trace", skip(self, block))]
2223    async fn stage_block_execution(
2224        &self,
2225        block: ProposedBlock,
2226        round: Option<u32>,
2227        published_blobs: Vec<Blob>,
2228        policy: BundleExecutionPolicy,
2229    ) -> Result<(Block, ChainInfoResponse, HashSet<ChainId>), chain_client::Error> {
2230        let mut events = EventSetDownloader::new(self);
2231        loop {
2232            let result = self
2233                .local_node
2234                .stage_block_execution(
2235                    block.clone(),
2236                    round,
2237                    published_blobs.clone(),
2238                    policy.clone(),
2239                )
2240                .await;
2241            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2242                let validators = self.validator_nodes().await?;
2243                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
2244                    .await?;
2245                continue; // We found the missing blob: retry.
2246            }
2247            if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
2248                if events.download_new(event_ids).await? {
2249                    continue; // We downloaded new publisher chain data: retry.
2250                }
2251                // All reported events were already downloaded; don't loop forever.
2252            }
2253            if let Ok((_, executed_block, _, _, _)) = &result {
2254                let hash = CryptoHash::new(executed_block);
2255                let notification = Notification {
2256                    chain_id: executed_block.header.chain_id,
2257                    reason: Reason::BlockExecuted {
2258                        height: executed_block.header.height,
2259                        hash,
2260                    },
2261                };
2262                self.notifier.notify(&[notification]);
2263            }
2264            let (
2265                _modified_block,
2266                executed_block,
2267                response,
2268                _resource_tracker,
2269                never_reject_origins,
2270            ) = result?;
2271            return Ok((executed_block, response, never_reject_origins));
2272        }
2273    }
2274}
2275
2276/// Returns the items in `ids` that are not yet present in `already_downloaded`.
2277fn filter_new<T: Clone + Eq + std::hash::Hash>(
2278    ids: &[T],
2279    already_downloaded: &HashSet<T>,
2280) -> Vec<T> {
2281    ids.iter()
2282        .filter(|id| !already_downloaded.contains(*id))
2283        .cloned()
2284        .collect()
2285}
2286
2287/// Per-call deduplication for an event-download retry loop. Holds the set of
2288/// events the loop has already downloaded and the [`Client`] used to fetch new
2289/// ones — call `download_new` each time the inner operation reports
2290/// `EventsNotFound` and continue the loop only if it returns `true`.
2291pub(crate) struct EventSetDownloader<'a, Env: Environment> {
2292    client: &'a Client<Env>,
2293    downloaded: HashSet<EventId>,
2294}
2295
2296impl<'a, Env: Environment> EventSetDownloader<'a, Env> {
2297    pub(crate) fn new(client: &'a Client<Env>) -> Self {
2298        Self {
2299            client,
2300            downloaded: HashSet::new(),
2301        }
2302    }
2303
2304    /// If any of `event_ids` haven't been downloaded yet, fetches the publisher
2305    /// certificates that contain them and returns `true`. Returns `false`
2306    /// (without downloading) if every reported event has already been
2307    /// downloaded — that prevents an infinite retry loop when the events are
2308    /// genuinely unavailable.
2309    pub(crate) async fn download_new(
2310        &mut self,
2311        event_ids: &[EventId],
2312    ) -> Result<bool, chain_client::Error> {
2313        let new_events = filter_new(event_ids, &self.downloaded);
2314        if new_events.is_empty() {
2315            return Ok(false);
2316        }
2317        Box::pin(self.client.download_certificates_for_events(&new_events)).await?;
2318        self.downloaded.extend(new_events);
2319        Ok(true)
2320    }
2321}
2322
2323/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay
2324/// on each subsequent node. Returns the first `Ok` result; if all of the nodes fail, returns
2325/// the collected `(validator, error)` pairs.
2326async fn communicate_concurrently<'a, A, E, F, R, V>(
2327    nodes: &[RemoteNode<A>],
2328    f: F,
2329    timeout: Duration,
2330) -> Result<V, Vec<(ValidatorPublicKey, E)>>
2331where
2332    F: Clone + FnOnce(RemoteNode<A>) -> R,
2333    RemoteNode<A>: Clone,
2334    R: Future<Output = Result<V, E>> + 'a,
2335{
2336    let mut nodes = nodes.to_vec();
2337    nodes.shuffle(&mut rand::thread_rng());
2338    let mut stream = nodes
2339        .iter()
2340        .zip(0..)
2341        .map(|(remote_node, i)| {
2342            let fun = f.clone();
2343            let node = remote_node.clone();
2344            async move {
2345                linera_base::time::timer::sleep(timeout * i * i).await;
2346                fun(node).await.map_err(|err| (remote_node.public_key, err))
2347            }
2348        })
2349        .collect::<FuturesUnordered<_>>();
2350    let mut errors = vec![];
2351    while let Some(maybe_result) = stream.next().await {
2352        match maybe_result {
2353            Ok(result) => return Ok(result),
2354            Err(error) => errors.push(error),
2355        };
2356    }
2357    Err(errors)
2358}
2359
2360/// Wrapper for `AbortHandle` that aborts when its dropped.
2361#[must_use]
2362pub struct AbortOnDrop(pub AbortHandle);
2363
2364impl Drop for AbortOnDrop {
2365    #[instrument(level = "trace", skip(self))]
2366    fn drop(&mut self) {
2367        self.0.abort();
2368    }
2369}
2370
2371/// A pending proposed block, together with its published blobs.
2372#[derive(Clone, Serialize, Deserialize)]
2373pub struct PendingProposal {
2374    pub block: ProposedBlock,
2375    pub blobs: Vec<Blob>,
2376    /// The execution outcome from the AutoRetry execution, used as a sanity check
2377    /// against the committed execution outcome.
2378    #[serde(default)]
2379    pub auto_retry_outcome: Option<BlockExecutionOutcome>,
2380    /// The round in which this proposal was first submitted, if any.
2381    #[serde(default)]
2382    pub round: Option<Round>,
2383}
2384
2385enum ReceiveCertificateMode {
2386    NeedsCheck,
2387    AlreadyChecked,
2388}
2389
2390enum CheckCertificateResult {
2391    /// The certificate's epoch has been revoked on the admin chain.
2392    OldEpoch,
2393    /// The committee for the certificate's epoch is unknown to us yet, e.g. because
2394    /// our local view of the admin chain is behind.
2395    FutureEpoch,
2396    New,
2397}
2398
2399impl CheckCertificateResult {
2400    fn into_result(self) -> Result<(), chain_client::Error> {
2401        match self {
2402            Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
2403            Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
2404            Self::New => Ok(()),
2405        }
2406    }
2407}
2408
2409/// Creates a compressed Contract, Service and bytecode, plus an optional
2410/// `ApplicationFormats` blob built from the JSON-encoded `Formats` description
2411/// bytes.
2412#[cfg(not(target_arch = "wasm32"))]
2413pub async fn create_bytecode_blobs(
2414    contract: Bytecode,
2415    service: Bytecode,
2416    vm_runtime: VmRuntime,
2417    formats: Option<Vec<u8>>,
2418) -> (Vec<Blob>, ModuleId) {
2419    let formats_blob = formats.map(Blob::new_application_formats);
2420    let formats_blob_hash = formats_blob.as_ref().map(|blob| blob.id().hash);
2421    let (mut blobs, module_id) = match vm_runtime {
2422        VmRuntime::Wasm => {
2423            let (compressed_contract, compressed_service) =
2424                tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
2425                    .await
2426                    .expect("Compression should not panic");
2427            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
2428            let service_blob = Blob::new_service_bytecode(compressed_service);
2429            let module_id = ModuleId::new_with_formats(
2430                contract_blob.id().hash,
2431                service_blob.id().hash,
2432                vm_runtime,
2433                formats_blob_hash,
2434            );
2435            (vec![contract_blob, service_blob], module_id)
2436        }
2437        VmRuntime::Evm => {
2438            let compressed_contract = contract.compress();
2439            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
2440            let module_id = ModuleId::new_with_formats(
2441                evm_contract_blob.id().hash,
2442                evm_contract_blob.id().hash,
2443                vm_runtime,
2444                formats_blob_hash,
2445            );
2446            (vec![evm_contract_blob], module_id)
2447        }
2448    };
2449    if let Some(blob) = formats_blob {
2450        blobs.push(blob);
2451    }
2452    (blobs, module_id)
2453}